Author: cwiklik Date: Tue Feb 22 02:55:34 2011 New Revision: 1073208 URL: http://svn.apache.org/viewvc?rev=1073208&view=rev Log: UIMA-2065 Added shutdown hook to enable clean shutdown
Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java?rev=1073208&r1=1073207&r2=1073208&view=diff ============================================================================== --- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java (original) +++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java Tue Feb 22 02:55:34 2011 @@ -43,7 +43,9 @@ import javax.naming.InitialContext; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQMessageConsumer; import org.apache.activemq.ActiveMQPrefetchPolicy; +import org.apache.activemq.ActiveMQSession; import org.apache.activemq.RedeliveryPolicy; import org.apache.activemq.command.ActiveMQBytesMessage; import org.apache.activemq.command.ActiveMQDestination; @@ -225,16 +227,27 @@ public class BaseUIMAAsynchronousEngine_ } public void stop() { - if (!running) { - return; - } - super.stop(); synchronized (connectionMux) { - running = false; + super.doStop(); + if (!running) { + return; + } + running = false; if (super.serviceDelegate != null) { // Cancel all timers and purge lists super.serviceDelegate.cleanup(); } + if (sender != null) { + sender.doStop(); + } + if (initialized) { + try { + consumerSession.close(); + ((ActiveMQMessageConsumer)consumer).stop(); + consumer.close(); + } catch (Exception exx) {} + } + try { // SharedConnection object manages a single JMS connection to // the broker. If the client is scaled out in the same JVM, the @@ -267,9 +280,6 @@ public class BaseUIMAAsynchronousEngine_ } finally { sharedConnectionSemaphore.release(); } - if (sender != null) { - sender.doStop(); - } // Undeploy all containers undeploy(); if (UIMAFramework.getLogger(CLASS_NAME).isLoggable( @@ -279,13 +289,6 @@ public class BaseUIMAAsynchronousEngine_ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_undeployed_containers__INFO"); } - if (initialized) { - try { - consumerSession.close(); - consumer.close(); - } catch (JMSException exx) { - } - } // unregister client if (jmxManager != null) { jmxManager.unregisterMBean(clientJmxObjectName); @@ -543,6 +546,10 @@ public class BaseUIMAAsynchronousEngine_ */ public synchronized void initialize(Map anApplicationContext) throws ResourceInitializationException { + // Add ShutdownHook to make sure the connection to the + // broker is always closed on process exit. + Runtime.getRuntime().addShutdownHook( + new Thread(new UimaASShutdownHook(this))); // Check the version of uimaj that UIMA AS was built with, against the UIMA Core version. If not the same throw Exception if (!UimaAsVersion.getUimajFullVersionString().equals(UimaVersion.getFullVersionString())) { UIMAFramework.getLogger(CLASS_NAME).logrb( Modified: uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java?rev=1073208&r1=1073207&r2=1073208&view=diff ============================================================================== --- uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java (original) +++ uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java Tue Feb 22 02:55:34 2011 @@ -29,6 +29,8 @@ import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicLong; @@ -195,6 +197,8 @@ public abstract class BaseUIMAAsynchrono protected static SharedConnection sharedConnection = null; + private ExecutorService exec = Executors.newFixedThreadPool(1); + abstract public String getEndPointName() throws Exception; abstract protected TextMessage createTextMessage() throws Exception; @@ -409,7 +413,7 @@ public abstract class BaseUIMAAsynchrono } } - public void stop() { + public void doStop() { synchronized (stopMux) { if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "stop", @@ -420,7 +424,8 @@ public abstract class BaseUIMAAsynchrono return; } - running = false; + exec.shutdownNow(); + casQueueProducerReady = false; if (serviceDelegate != null) { serviceDelegate.cancelDelegateTimer(); @@ -509,12 +514,12 @@ public abstract class BaseUIMAAsynchrono // Every thread requesting a CAS adds an entry to this // queue. CasQueueEntry entry = threadQueue.take(); - if (!running) { - return; // client API has been stopped - } CAS cas = null; long startTime = System.nanoTime(); // Wait for a free CAS instance + if (!running || asynchManager == null) { + return; // client API has been stopped + } if (remoteService) { cas = asynchManager.getNewCas("ApplicationCasPoolContext"); } else { @@ -1312,8 +1317,9 @@ public abstract class BaseUIMAAsynchrono if (exception != null && cachedRequest != null) { cachedRequest.setException(exception); if (exception instanceof AnalysisEngineProcessException - || (exception.getCause() != null && (exception.getCause() instanceof AnalysisEngineProcessException || exception - .getCause() instanceof ServiceShutdownException))) { + || (exception.getCause() != null && + (exception.getCause() instanceof AnalysisEngineProcessException || + exception.getCause() instanceof ServiceShutdownException))) { // Indicate that this is a process exception. cachedRequest.setProcessException(); } @@ -1649,58 +1655,78 @@ public abstract class BaseUIMAAsynchrono * Listener method receiving JMS Messages from the response queue. * */ - public void onMessage(Message message) { - try { - + public void onMessage(final Message message) { + // Process message in a separate thread. Previously the message was processed in ActiveMQ dispatch thread. + // This onMessage() method is called by ActiveMQ code from a critical region protected with a lock. The lock + // is only released if this method returns. Running in a dispatch thread caused a hang when an application + // decided to call System.exit() in any of its callback listener methods. The UIMA AS client adds a + // ShutdownHoook to the JVM to enable orderly shutdown which includes stopping JMS Consumer, JMS Producer + // and finally stopping JMS Connection. The ShutdownHook support was added to the client in case the + // application doesnt call client's stop() method. Now, the hang was caused by the fact that the dispatch + // thread was used to call System.exit() which in turn executed client's ShutdownHook code. The ShutdownHook + // code runs in a separate thread, but the the JVM blocks the dispatch thread until the ShutdownHook + // finishes. It never will though, since the ShutdownHook is calling ActiveMQSession.close() which tries to enter + // the same critical region that the dispatch thread is still stuck into. DEADLOCK. + // The code below uses a simple FixedThreadPool Executor with a single thread. This thread is reused instead + // creating one on the fly. + exec.execute( new Runnable() { - if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) { - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "onMessage", - JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_msg_FINEST", - new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) }); - } - if (!message.propertyExists(AsynchAEMessage.Command)) { - return; - } + public void run() { + try { + + + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "onMessage", + JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_msg_FINEST", + new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) }); + } + if (!message.propertyExists(AsynchAEMessage.Command)) { + return; + } + + int command = message.getIntProperty(AsynchAEMessage.Command); + if (AsynchAEMessage.CollectionProcessComplete == command) { + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "onMessage", + JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_cpc_reply_FINE", + new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) }); + } + handleCollectionProcessCompleteReply(message); + } else if (AsynchAEMessage.GetMeta == command) { + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "onMessage", + JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_meta_reply_FINE", + new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) }); + } + handleMetadataReply(message); + } else if (AsynchAEMessage.Process == command) { + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "onMessage", + JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_process_reply_FINE", + new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) }); + } + handleProcessReply(message, true, null); + } else if (AsynchAEMessage.ServiceInfo == command) { + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), + "onMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAJMS_received_service_info_FINEST", + new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) }); + } + handleServiceInfo(message); + } + } catch (Exception e) { + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), + "onMessage", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAEE_exception__WARNING", e); + } - int command = message.getIntProperty(AsynchAEMessage.Command); - if (AsynchAEMessage.CollectionProcessComplete == command) { - if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "onMessage", - JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_cpc_reply_FINE", - new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) }); - } - handleCollectionProcessCompleteReply(message); - } else if (AsynchAEMessage.GetMeta == command) { - if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "onMessage", - JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_meta_reply_FINE", - new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) }); - } - handleMetadataReply(message); - } else if (AsynchAEMessage.Process == command) { - if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "onMessage", - JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_received_process_reply_FINE", - new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) }); - } - handleProcessReply(message, true, null); - } else if (AsynchAEMessage.ServiceInfo == command) { - if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) { - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), - "onMessage", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, - "UIMAJMS_received_service_info_FINEST", - new Object[] { message.getStringProperty(AsynchAEMessage.MessageFrom) }); } - handleServiceInfo(message); - } - } catch (Exception e) { - if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), - "onMessage", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, - "UIMAEE_exception__WARNING", e); + } + }); - } } /** @@ -2410,7 +2436,11 @@ public abstract class BaseUIMAAsynchrono JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_error_while_sending_msg__WARNING", new Object[] { aDestination, aFailure }); } - stop(); + try { + stop(); + } catch( Exception e) { + e.printStackTrace(); + } } /** @@ -2609,11 +2639,17 @@ public abstract class BaseUIMAAsynchrono synchronized(destroyMux) { // Check if all clients have terminated and only than stop the shared connection if (getClientCount() == 0 && connection != null - && !((ActiveMQConnection) connection).isClosed()) { + && !((ActiveMQConnection) connection).isClosed() + && !((ActiveMQConnection) connection).isClosing()) { try { stop = true; connection.stop(); connection.close(); + while( !((ActiveMQConnection) connection).isClosed() ) { + try { + destroyMux.wait(100); + } catch( InterruptedException exx) {} + } } catch (Exception e) { /* ignore */ } @@ -2636,4 +2672,20 @@ public abstract class BaseUIMAAsynchrono } } } + public class UimaASShutdownHook implements Runnable { + UimaAsynchronousEngine asEngine=null; + public UimaASShutdownHook( UimaAsynchronousEngine asEngine) { + this.asEngine = asEngine; + } + public void run() { + try { + if ( asEngine != null ) { + asEngine.stop(); + } + } catch( Exception ex) { + ex.printStackTrace(); + } + } + + } }