handling all thread pool shutting down scenarion from stratos common component itself
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/90008ddc Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/90008ddc Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/90008ddc Branch: refs/heads/stratos-4.1.x Commit: 90008ddc1fbfa0bc098524fc746326b15e42e4ca Parents: cef4fe1 Author: Isuru Haththotuwa <[email protected]> Authored: Sun Dec 6 11:58:39 2015 +0530 Committer: Isuru Haththotuwa <[email protected]> Committed: Mon Dec 7 18:49:55 2015 +0530 ---------------------------------------------------------------------- .../common/internal/CommonServiceComponent.java | 12 +- .../common/threading/StratosThreadPool.java | 146 +++++++++++++++++++ 2 files changed, 157 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/90008ddc/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/internal/CommonServiceComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/internal/CommonServiceComponent.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/internal/CommonServiceComponent.java index 1059988..a29092c 100644 --- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/internal/CommonServiceComponent.java +++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/internal/CommonServiceComponent.java @@ -26,6 +26,7 @@ import org.apache.stratos.common.clustering.impl.HazelcastDistributedObjectProvi import org.apache.stratos.common.constants.StratosConstants; import org.apache.stratos.common.services.ComponentStartUpSynchronizer; import org.apache.stratos.common.services.DistributedObjectProvider; +import org.apache.stratos.common.threading.StratosThreadPool; import org.apache.stratos.common.util.CommonUtil; import org.apache.stratos.common.util.StratosConfiguration; import org.osgi.framework.BundleContext; @@ -135,7 +136,16 @@ public class CommonServiceComponent { } protected void deactivate(ComponentContext context) { - log.debug("Stratos common service is deactivated"); + + try { + // Shutdown all thread pools + StratosThreadPool.shutDownAllThreadPoolsGracefully(); + StratosThreadPool.shutDownAllScheduledExecutorsGracefully(); + log.debug("Stratos common service component is de-activated"); + } + catch (Exception e) { + log.error("Could not de-activate Stratos common service component", e); + } } protected void setRegistryService(RegistryService registryService) { http://git-wip-us.apache.org/repos/asf/stratos/blob/90008ddc/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java index 459cd1d..50b53bb 100644 --- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java +++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java @@ -23,7 +23,9 @@ package org.apache.stratos.common.threading; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.concurrent.*; /** @@ -85,4 +87,148 @@ public class StratosThreadPool { } return scheduledExecutorService; } + + public static void shutDownAllThreadPoolsGracefully () { + + int threadPoolCount = executorMap.size(); + if (threadPoolCount == 0) { + log.info("No thread pools found to shut down"); + return; + } + + Set<Future<String>> threadPoolTerminatorFutures = new HashSet<>(); + ExecutorService threadPoolTerminator = null; + + try { + threadPoolTerminator = Executors.newFixedThreadPool(threadPoolCount); + for (Map.Entry<String, ThreadPoolExecutor> entry : executorMap.entrySet()) { + threadPoolTerminatorFutures.add(threadPoolTerminator.submit(new GracefulThreadPoolTerminator(entry.getKey(), + entry.getValue()))); + } + // use the Future to block until shutting down is done + for (Future<String> threadPoolTerminatorFuture : threadPoolTerminatorFutures) { + removeThreadPoolFromCache(threadPoolTerminatorFuture.get()); + } + + } catch (InterruptedException e) { + log.error("Error in shutting down thread pools", e); + } catch (ExecutionException e) { + log.error("Error in shutting down thread pools", e); + } finally { + // if there are any remaining thread pools, shut down immediately + if (!executorMap.isEmpty()) { + for (Map.Entry<String, ThreadPoolExecutor> entry : executorMap.entrySet()) { + entry.getValue().shutdownNow(); + removeThreadPoolFromCache(entry.getKey()); + } + } + + // shut down the threadPoolTerminator threads + threadPoolTerminator.shutdownNow(); + } + } + + public static void shutDownAllScheduledExecutorsGracefully () { + + int threadPoolCount = scheduledServiceMap.size(); + if (threadPoolCount == 0) { + log.info("No thread pools found to shut down"); + return; + } + + Set<Future<String>> threadPoolTerminatorFutures = new HashSet<>(); + ExecutorService threadPoolTerminator = null; + + try { + threadPoolTerminator = Executors.newFixedThreadPool(threadPoolCount); + for (Map.Entry<String, ScheduledExecutorService> entry : scheduledServiceMap.entrySet()) { + threadPoolTerminatorFutures.add(threadPoolTerminator.submit(new GracefulScheduledThreadPoolTerminator(entry.getKey(), + entry.getValue()))); + } + // use the Future to block until shutting down is done + for (Future<String> threadPoolTerminatorFuture : threadPoolTerminatorFutures) { + removeScheduledThreadPoolFromCache(threadPoolTerminatorFuture.get()); + } + + } catch (InterruptedException e) { + log.error("Error in shutting down thread pools", e); + } catch (ExecutionException e) { + log.error("Error in shutting down thread pools", e); + } finally { + // if there are any remaining thread pools, shut down immediately + if (!scheduledServiceMap.isEmpty()) { + for (Map.Entry<String, ScheduledExecutorService> entry : scheduledServiceMap.entrySet()) { + entry.getValue().shutdownNow(); + removeScheduledThreadPoolFromCache(entry.getKey()); + } + } + + // shut down the threadPoolTerminator threads + threadPoolTerminator.shutdownNow(); + } + } + + private static void removeThreadPoolFromCache(String terminatedPoolId) { + if (executorMap.remove(terminatedPoolId) != null) { + log.info("Thread pool [id] " + terminatedPoolId + " is successfully shut down" + + " and removed from the cache"); + } + } + + private static void removeScheduledThreadPoolFromCache(String terminatedPoolId) { + if (scheduledServiceMap.remove(terminatedPoolId) != null) { + log.info("Scheduled Thread pool [id] " + terminatedPoolId + " is successfully shut down" + + " and removed from the cache"); + } + } + + private static class GracefulThreadPoolTerminator implements Callable { + + private String threadPoolId; + private ThreadPoolExecutor executor; + + public GracefulThreadPoolTerminator (String threadPoolId, ThreadPoolExecutor executor) { + this.threadPoolId = threadPoolId; + this.executor = executor; + } + + @Override + public String call() throws Exception { + log.info("Shutting down thread pool " + threadPoolId); + // try to shut down gracefully + executor.shutdown(); + // wait 10 secs till terminated + if (!executor.awaitTermination(10, TimeUnit.SECONDS)) { + log.info("Thread Pool [id] " + threadPoolId + " did not finish all tasks before " + + "timeout, forcefully shutting down"); + executor.shutdownNow(); + } + return threadPoolId; + } + } + + private static class GracefulScheduledThreadPoolTerminator implements Callable { + + private String threadPoolId; + private ScheduledExecutorService scheduledExecutor; + + public GracefulScheduledThreadPoolTerminator (String threadPoolId, ScheduledExecutorService scheduledExecutor) { + this.threadPoolId = threadPoolId; + this.scheduledExecutor = scheduledExecutor; + } + + @Override + public String call() throws Exception { + log.info("Shutting down scheduled thread pool " + threadPoolId); + // try to shut down gracefully + scheduledExecutor.shutdown(); + // wait 10 secs till terminated + if (!scheduledExecutor.awaitTermination(10, TimeUnit.SECONDS)) { + log.info("Scheduled thread Pool [id] " + threadPoolId + " did not finish all tasks before " + + "timeout, forcefully shutting down"); + scheduledExecutor.shutdownNow(); + } + return threadPoolId; + } + } }
