Author: asankha
Date: Fri Apr 25 01:07:36 2008
New Revision: 16145

Log:

fix issues with maintenence mode/graceful shutdown and core JMX control


Modified:
   trunk/esb/java/modules/core/src/main/java/org/wso2/esb/ServiceBusManager.java
   
trunk/esb/java/modules/core/src/main/java/org/wso2/esb/jmx/mbean/ServiceBusControl.java
   trunk/esb/java/repository/conf/sample/resources/misc/server/axis2.xml

Modified: 
trunk/esb/java/modules/core/src/main/java/org/wso2/esb/ServiceBusManager.java
==============================================================================
--- 
trunk/esb/java/modules/core/src/main/java/org/wso2/esb/ServiceBusManager.java   
    (original)
+++ 
trunk/esb/java/modules/core/src/main/java/org/wso2/esb/ServiceBusManager.java   
    Fri Apr 25 01:07:36 2008
@@ -21,11 +21,17 @@
 import org.apache.axiom.om.xpath.AXIOMXPath;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.description.Parameter;
+import org.apache.axis2.description.TransportInDescription;
+import org.apache.axis2.description.TransportOutDescription;
+import org.apache.axis2.transport.TransportListener;
+import org.apache.axis2.transport.TransportSender;
+import org.apache.axis2.AxisFault;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.derby.drda.NetworkServerControl;
 import org.apache.synapse.ServerManager;
 import org.apache.synapse.SynapseConstants;
+import org.apache.synapse.transport.base.ManagementSupport;
 import org.apache.synapse.config.SynapseConfiguration;
 import org.apache.synapse.core.SynapseEnvironment;
 import org.apache.synapse.registry.Registry;
@@ -90,6 +96,8 @@
     /** Save the TCCL of the initial thread that starts the ESB for future 
use. When JMX calls are
      * received via RMI connections, re-start etc may otherwise fail due to 
classloading issues */
     private ClassLoader classLoader = null;
+    /** Is the service bus manager (i.e. this) started? */
+    private boolean serviceBusManagerStarted = false;
 
     private ServiceBusManager() {}
 
@@ -102,15 +110,17 @@
 
     /**
      * Perform one time configuration for the JVM such as Log4J 
initialization, and core
-     * JMX Setup of the ServerView
+     * JMX Setup of the ServerView MBean
      */
     public void init() {
         if (alreadyInitialized) {
             return;
         }
 
+        // save web app classloader
         classLoader = Thread.currentThread().getContextClassLoader();
 
+        // initialize log4j
         URL log4jURL = null;
         try {
             log4jURL = new URL("file:" + sbc.getEsbHome() + File.separator
@@ -123,7 +133,9 @@
         // register a JVM shutdown hook
         Thread shutdownHook = new Thread() {
             public void run() {
-                shutdown();
+                if (serviceBusManagerStarted) {
+                    shutdown();
+                }
             }
         };
         shutdownHook.setName("WSO2 ESB Shutdown Hook");
@@ -131,16 +143,26 @@
 
         HibernateConfigCache.clearCache();
 
+        // start JMX
         startJMXService();
         registerMBean(
             ManagementFactory.getPlatformMBeanServer(), new 
ServiceBusControl(),
             sbc.getJmxAgentName() + ":Type=Admin,ConnectorName=ServerAdmin");
+        // ask Synapse transports to use the ESB agent name
+        System.setProperty(ServiceBusConstants.JMX_AGENT_NAME, 
sbc.getJmxAgentName());
 
         alreadyInitialized = true;
+        serviceBusManagerStarted = true;
     }
 
+    /**
+     * Start the WSO2 ESB. This method can be invoked multiple times within a 
JVM to re-start
+     * the ESB after an invocation of the stop() method. However, a shutdown() 
would terminate
+     * the core ServerViewMBean and will prevent any further start() 
invocations over JMX
+     */
     public void start() {
         if (!alreadyInitialized) {
+            // if one time (per JVM) initialization has not yet taken place, 
do it..
             init();
         }
 
@@ -161,24 +183,223 @@
         // generate UI pages and start JMX
         generateUIPages();
         registerMBeans();
+        log.info("[ESB] Start request completed");
     }
 
+    /**
+     * Stops the ESB, and allow it to be re-started via the start() method 
subsequently. This
+     * will stop the Synapse engine (and transports) and also unregister all 
JMX MBeans, leaving
+     * only the core ServerView MBean that could be used to restart the ESB
+     * @throws ServiceBusException
+     */
     public void stop() throws ServiceBusException {
         log.info("Stopping the WSO2 Enterprise Service Bus..");
         stopStatisticsReporter();
         stopDataBaseServer();
         ServerManager.getInstance().stop();
         unregisterMBeans();
+        log.info("[ESB] Stop request completed");
     }
 
+    /**
+     * Stop the ESB and the ServerView MBean and JMX control, and shutdown the 
JVM
+     */
     public void shutdown() {
         stop();
         stopJMXService();
         log.info("[ESB] Shutdown completed");
+        serviceBusManagerStarted = false;
+
+        // wait a few seconds more just to be really sure..
+        try {
+            Thread.sleep(3000);
+        } catch (InterruptedException ignore) {}
+
+        // if this was invoked by the shutdown hook, we should not call 
System.exit
+        if (!Thread.currentThread().getName().endsWith("Hook")) {
+            System.exit(0);
+        }
     }
 
-    public void shutdownGracefully(long millis) {
-        // TODO
+    /**
+     * Graceful shutdown of the ESB
+     *
+     * Put all transports that can go into maintenence mode into maintenence
+     * wait for all transport listener thread pools to reach zero
+     *
+     * @param waitMillis the maximum number of ms to wait until a graceful 
shutdown is achieved,
+     * before forcing a shutdown
+     */
+    public void shutdownGracefully(long waitMillis) {
+
+        long endTime = System.currentTimeMillis() + waitMillis;
+        log.info("Requesting a graceful shutdown at : " + new Date() + " in a 
maximum of : " +
+            (waitMillis/1000) + " seconds");
+
+        // put transports into maintenence
+        startMaintenance();
+
+        // wait till listeners are idle
+        boolean safeToShutdown = false;
+
+        while (!safeToShutdown) {
+
+            int pendingThreads = 0;
+            Map<String, TransportInDescription> trpIns =
+                configurationContext.getAxisConfiguration().getTransportsIn();
+            Map<String, TransportOutDescription> trpOuts =
+                configurationContext.getAxisConfiguration().getTransportsOut();
+
+            for (TransportInDescription trpIn : trpIns.values()) {
+                TransportListener trpLst = trpIn.getReceiver();
+
+                if (trpLst instanceof ManagementSupport) {
+                    int inUse = ((ManagementSupport) 
trpLst).getActiveThreadCount();
+                    int inQue = ((ManagementSupport) trpLst).getQueueSize();
+
+                    if (inUse+inQue > 0) {
+                        log.info("Transport Listsner : " + trpIn.getName() +
+                            " currently using : " + inUse + " threads with " +
+                            inQue + " requests already queued..");
+
+                        pendingThreads = (inUse + inQue);
+                    }
+                }
+            }
+
+            for (TransportOutDescription trpOut : trpOuts.values()) {
+                TransportSender trpSnd = trpOut.getSender();
+
+                if (trpSnd instanceof ManagementSupport) {
+                    int inUse = ((ManagementSupport) 
trpSnd).getActiveThreadCount();
+                    int inQue = ((ManagementSupport) trpSnd).getQueueSize();
+
+                    if (inUse+inQue > 0) {
+                        log.info("Transport Sender : " + trpOut.getName() +
+                            " currently using : " + inUse + " threads with " +
+                            inQue + " requests already queued..");
+
+                        pendingThreads += (inUse + inQue);
+                    }
+                }
+            }
+
+            int pendingCallbacks = 
ServerManager.getInstance().pendingCallbacks();
+            if (pendingCallbacks > 0) {
+                log.info("Waiting for : " + pendingCallbacks + " 
callbacks/replies..");
+            }
+
+            // safe to shutdown is when used listner threads and callbacks are 
all zero
+            safeToShutdown = (pendingThreads + pendingCallbacks == 0);
+
+            if (safeToShutdown) {
+                log.info("All transport threads are idle and no pending 
callbacks..");
+
+            } else {
+                if (System.currentTimeMillis() > endTime) {
+                    log.info("The instance could not be gracefully shutdown in 
: " +
+                        (waitMillis/1000) + " seconds. Shutdown immidiate..");
+                    safeToShutdown = true;
+                    stop();
+
+                } else {
+                    log.info("Waiting for a maximum of another " +
+                        (endTime - System.currentTimeMillis())/1000 +
+                        " seconds until transport threads to become idle, and 
callbacks complete..");
+                    try {
+                        Thread.sleep(2000);
+                    } catch (InterruptedException ignore) {}
+                }
+            }
+        }
+        log.info("Graceful shutdown request completed in " +
+            (System.currentTimeMillis() - (endTime-waitMillis))/1000 + " 
seconds");
+    }
+
+    /**
+     * Put transport listsners and senders into maintenence mode
+     */
+    public void startMaintenance() {
+        log.info("Putting transport listners and senders into maintenence 
mode..");
+
+        // put listers into maintenence
+        Map<String, TransportInDescription> trpIns =
+            configurationContext.getAxisConfiguration().getTransportsIn();
+
+        for (TransportInDescription trpIn : trpIns.values()) {
+            TransportListener trpLst = trpIn.getReceiver();
+            if (trpLst instanceof ManagementSupport) {
+                try {
+                    ((ManagementSupport) trpLst).pause();
+                } catch (AxisFault axisFault) {
+                    log.error("Error putting transport listener for : "
+                        + trpIn.getName() + " into maintenence");
+                }
+            }
+        }
+
+        // put senders into maintenence
+        Map<String, TransportOutDescription> trpOuts =
+            configurationContext.getAxisConfiguration().getTransportsOut();
+        for (TransportOutDescription trpOut : trpOuts.values()) {
+            TransportSender trpSnd = trpOut.getSender();
+            if (trpSnd instanceof ManagementSupport) {
+                try {
+                    ((ManagementSupport) trpSnd).pause();
+                } catch (AxisFault axisFault) {
+                    log.error("Error putting transport sender for : "
+                        + trpOut.getName() + " into maintenence");
+                }
+            }
+        }
+
+        // put tasks on hold
+        // TODO $$$$$$$$$$$$$$$$4
+
+        log.info("[ESB] Entered maintenence mode");
+    }
+
+    /**
+     * Resume normal operations for transports already entered into 
maintenence mode
+     */
+    public void endMaintenance() {
+        log.info("Resuming transport listners and senders from maintenence 
mode..");
+
+        // put listers into maintenence
+        Map<String, TransportInDescription> trpIns =
+            configurationContext.getAxisConfiguration().getTransportsIn();
+
+        for (TransportInDescription trpIn : trpIns.values()) {
+            TransportListener trpLst = trpIn.getReceiver();
+            if (trpLst instanceof ManagementSupport) {
+                try {
+                    ((ManagementSupport) trpLst).resume();
+                } catch (AxisFault axisFault) {
+                    log.error("Error putting transport listener for : "
+                        + trpIn.getName() + " into maintenence");
+                }
+            }
+        }
+
+        // put senders into maintenence
+        Map<String, TransportOutDescription> trpOuts =
+            configurationContext.getAxisConfiguration().getTransportsOut();
+        for (TransportOutDescription trpOut : trpOuts.values()) {
+            TransportSender trpSnd = trpOut.getSender();
+            if (trpSnd instanceof ManagementSupport) {
+                try {
+                    ((ManagementSupport) trpSnd).resume();
+                } catch (AxisFault axisFault) {
+                    log.error("Error putting transport sender for : "
+                        + trpOut.getName() + " into maintenence");
+                }
+            }
+        }
+
+        // Resume tasks
+        // TODO $$$$$$$$$$$$$$$$4        
+
+        log.info("[ESB] Resumed normal operations from maintenence mode");
     }
 
     /**
@@ -370,7 +591,7 @@
      * Stop the statistics processing thread
      */
     private void stopStatisticsReporter() {
-        if (statisticsReporterThread != null) {
+        if (statisticsReporterThread != null && 
statisticsReporterThread.isAlive()) {
             statisticsReporterThread.shutdown();
             statisticsReporterThread.interrupt();
         }

Modified: 
trunk/esb/java/modules/core/src/main/java/org/wso2/esb/jmx/mbean/ServiceBusControl.java
==============================================================================
--- 
trunk/esb/java/modules/core/src/main/java/org/wso2/esb/jmx/mbean/ServiceBusControl.java
     (original)
+++ 
trunk/esb/java/modules/core/src/main/java/org/wso2/esb/jmx/mbean/ServiceBusControl.java
     Fri Apr 25 01:07:36 2008
@@ -50,15 +50,18 @@
         sbm.shutdown();
     }
 
-    public void shutdownGracefully(long millis) throws Exception {
+    public void shutdownGracefully(long secs) throws Exception {
         Thread.currentThread().setContextClassLoader(sbm.getClassLoader());
+        sbm.shutdownGracefully(secs * 1000);
     }
 
     public void startMaintenance() throws Exception {
         Thread.currentThread().setContextClassLoader(sbm.getClassLoader());
+        sbm.startMaintenance();
     }
 
     public void endMaintenance() throws Exception {
         Thread.currentThread().setContextClassLoader(sbm.getClassLoader());
+        sbm.endMaintenance();
     }
 }

Modified: trunk/esb/java/repository/conf/sample/resources/misc/server/axis2.xml
==============================================================================
--- trunk/esb/java/repository/conf/sample/resources/misc/server/axis2.xml       
(original)
+++ trunk/esb/java/repository/conf/sample/resources/misc/server/axis2.xml       
Fri Apr 25 01:07:36 2008
@@ -151,7 +151,7 @@
        <parameter name="non-blocking" locked="false">true</parameter>
         <parameter name="keystore" locked="false">
             <KeyStore>
-                <Location>identity.jks</Location>
+                
<Location>../../webapp/WEB-INF/classes/conf/identity.jks</Location>
                 <Type>JKS</Type>
                 <Password>password</Password>
                 <KeyPassword>password</KeyPassword>
@@ -159,7 +159,7 @@
         </parameter>
         <parameter name="truststore" locked="false">
             <TrustStore>
-                <Location>trust.jks</Location>
+                
<Location>../../webapp/WEB-INF/classes/conf/trust.jks</Location>
                 <Type>JKS</Type>
                 <Password>password</Password>
             </TrustStore>
@@ -211,7 +211,7 @@
         <parameter name="non-blocking" locked="false">true</parameter>
         <parameter name="keystore" locked="false">
             <KeyStore>
-                <Location>identity.jks</Location>
+                
<Location>../../webapp/WEB-INF/classes/conf/identity.jks</Location>
                 <Type>JKS</Type>
                 <Password>password</Password>
                 <KeyPassword>password</KeyPassword>
@@ -219,7 +219,7 @@
         </parameter>
         <parameter name="truststore" locked="false">
             <TrustStore>
-                <Location>trust.jks</Location>
+                
<Location>../../webapp/WEB-INF/classes/conf/trust.jks</Location>
                 <Type>JKS</Type>
                 <Password>password</Password>
             </TrustStore>

_______________________________________________
Esb-java-dev mailing list
[email protected]
http://wso2.org/cgi-bin/mailman/listinfo/esb-java-dev

Reply via email to