handling all thread pool shutting down scenarion from stratos common component 
itself


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/90008ddc
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/90008ddc
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/90008ddc

Branch: refs/heads/stratos-4.1.x
Commit: 90008ddc1fbfa0bc098524fc746326b15e42e4ca
Parents: cef4fe1
Author: Isuru Haththotuwa <[email protected]>
Authored: Sun Dec 6 11:58:39 2015 +0530
Committer: Isuru Haththotuwa <[email protected]>
Committed: Mon Dec 7 18:49:55 2015 +0530

----------------------------------------------------------------------
 .../common/internal/CommonServiceComponent.java |  12 +-
 .../common/threading/StratosThreadPool.java     | 146 +++++++++++++++++++
 2 files changed, 157 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/90008ddc/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/internal/CommonServiceComponent.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/internal/CommonServiceComponent.java
 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/internal/CommonServiceComponent.java
index 1059988..a29092c 100644
--- 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/internal/CommonServiceComponent.java
+++ 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/internal/CommonServiceComponent.java
@@ -26,6 +26,7 @@ import 
org.apache.stratos.common.clustering.impl.HazelcastDistributedObjectProvi
 import org.apache.stratos.common.constants.StratosConstants;
 import org.apache.stratos.common.services.ComponentStartUpSynchronizer;
 import org.apache.stratos.common.services.DistributedObjectProvider;
+import org.apache.stratos.common.threading.StratosThreadPool;
 import org.apache.stratos.common.util.CommonUtil;
 import org.apache.stratos.common.util.StratosConfiguration;
 import org.osgi.framework.BundleContext;
@@ -135,7 +136,16 @@ public class CommonServiceComponent {
     }
 
     protected void deactivate(ComponentContext context) {
-        log.debug("Stratos common service is deactivated");
+
+        try {
+            // Shutdown all thread pools
+            StratosThreadPool.shutDownAllThreadPoolsGracefully();
+            StratosThreadPool.shutDownAllScheduledExecutorsGracefully();
+            log.debug("Stratos common service component is de-activated");
+        }
+        catch (Exception e) {
+            log.error("Could not de-activate Stratos common service 
component", e);
+        }
     }
 
     protected void setRegistryService(RegistryService registryService) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/90008ddc/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
index 459cd1d..50b53bb 100644
--- 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
+++ 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/threading/StratosThreadPool.java
@@ -23,7 +23,9 @@ package org.apache.stratos.common.threading;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.*;
 
 /**
@@ -85,4 +87,148 @@ public class StratosThreadPool {
         }
         return scheduledExecutorService;
     }
+
+    public static void shutDownAllThreadPoolsGracefully () {
+
+        int threadPoolCount = executorMap.size();
+        if (threadPoolCount == 0) {
+            log.info("No thread pools found to shut down");
+            return;
+        }
+
+        Set<Future<String>> threadPoolTerminatorFutures = new HashSet<>();
+        ExecutorService threadPoolTerminator = null;
+
+        try {
+            threadPoolTerminator = 
Executors.newFixedThreadPool(threadPoolCount);
+            for (Map.Entry<String, ThreadPoolExecutor> entry : 
executorMap.entrySet()) {
+                
threadPoolTerminatorFutures.add(threadPoolTerminator.submit(new 
GracefulThreadPoolTerminator(entry.getKey(),
+                        entry.getValue())));
+            }
+            // use the Future to block until shutting down is done
+            for (Future<String> threadPoolTerminatorFuture : 
threadPoolTerminatorFutures) {
+                removeThreadPoolFromCache(threadPoolTerminatorFuture.get());
+            }
+
+        } catch (InterruptedException e) {
+            log.error("Error in shutting down thread pools", e);
+        } catch (ExecutionException e) {
+            log.error("Error in shutting down thread pools", e);
+        } finally {
+            // if there are any remaining thread pools, shut down immediately
+            if (!executorMap.isEmpty()) {
+                for (Map.Entry<String, ThreadPoolExecutor> entry : 
executorMap.entrySet()) {
+                    entry.getValue().shutdownNow();
+                    removeThreadPoolFromCache(entry.getKey());
+                }
+            }
+
+            // shut down the threadPoolTerminator threads
+            threadPoolTerminator.shutdownNow();
+        }
+    }
+
+    public static void shutDownAllScheduledExecutorsGracefully () {
+
+        int threadPoolCount = scheduledServiceMap.size();
+        if (threadPoolCount == 0) {
+            log.info("No thread pools found to shut down");
+            return;
+        }
+
+        Set<Future<String>> threadPoolTerminatorFutures = new HashSet<>();
+        ExecutorService threadPoolTerminator = null;
+
+        try {
+            threadPoolTerminator = 
Executors.newFixedThreadPool(threadPoolCount);
+            for (Map.Entry<String, ScheduledExecutorService> entry : 
scheduledServiceMap.entrySet()) {
+                
threadPoolTerminatorFutures.add(threadPoolTerminator.submit(new 
GracefulScheduledThreadPoolTerminator(entry.getKey(),
+                        entry.getValue())));
+            }
+            // use the Future to block until shutting down is done
+            for (Future<String> threadPoolTerminatorFuture : 
threadPoolTerminatorFutures) {
+                
removeScheduledThreadPoolFromCache(threadPoolTerminatorFuture.get());
+            }
+
+        } catch (InterruptedException e) {
+            log.error("Error in shutting down thread pools", e);
+        } catch (ExecutionException e) {
+            log.error("Error in shutting down thread pools", e);
+        } finally {
+            // if there are any remaining thread pools, shut down immediately
+            if (!scheduledServiceMap.isEmpty()) {
+                for (Map.Entry<String, ScheduledExecutorService> entry : 
scheduledServiceMap.entrySet()) {
+                    entry.getValue().shutdownNow();
+                    removeScheduledThreadPoolFromCache(entry.getKey());
+                }
+            }
+
+            // shut down the threadPoolTerminator threads
+            threadPoolTerminator.shutdownNow();
+        }
+    }
+
+    private static void removeThreadPoolFromCache(String terminatedPoolId) {
+        if (executorMap.remove(terminatedPoolId) != null) {
+            log.info("Thread pool [id] " + terminatedPoolId + " is 
successfully shut down" +
+                    " and removed from the cache");
+        }
+    }
+
+    private static void removeScheduledThreadPoolFromCache(String 
terminatedPoolId) {
+        if (scheduledServiceMap.remove(terminatedPoolId) != null) {
+            log.info("Scheduled Thread pool [id] " + terminatedPoolId + " is 
successfully shut down" +
+                    " and removed from the cache");
+        }
+    }
+
+    private static class GracefulThreadPoolTerminator implements Callable {
+
+        private String threadPoolId;
+        private ThreadPoolExecutor executor;
+
+        public GracefulThreadPoolTerminator (String threadPoolId, 
ThreadPoolExecutor executor) {
+            this.threadPoolId = threadPoolId;
+            this.executor = executor;
+        }
+
+        @Override
+        public String call() throws Exception {
+            log.info("Shutting down thread pool " + threadPoolId);
+            // try to shut down gracefully
+            executor.shutdown();
+            // wait 10 secs till terminated
+            if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
+                log.info("Thread Pool [id] " + threadPoolId + " did not finish 
all tasks before " +
+                        "timeout, forcefully shutting down");
+                executor.shutdownNow();
+            }
+            return threadPoolId;
+        }
+    }
+
+    private static class GracefulScheduledThreadPoolTerminator implements 
Callable {
+
+        private String threadPoolId;
+        private ScheduledExecutorService scheduledExecutor;
+
+        public GracefulScheduledThreadPoolTerminator (String threadPoolId, 
ScheduledExecutorService scheduledExecutor) {
+            this.threadPoolId = threadPoolId;
+            this.scheduledExecutor = scheduledExecutor;
+        }
+
+        @Override
+        public String call() throws Exception {
+            log.info("Shutting down scheduled thread pool " + threadPoolId);
+            // try to shut down gracefully
+            scheduledExecutor.shutdown();
+            // wait 10 secs till terminated
+            if (!scheduledExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
+                log.info("Scheduled thread Pool [id] " + threadPoolId + " did 
not finish all tasks before " +
+                        "timeout, forcefully shutting down");
+                scheduledExecutor.shutdownNow();
+            }
+            return threadPoolId;
+        }
+    }
 }

Reply via email to