Author: cwiklik Date: Wed Nov 2 18:42:11 2011 New Revision: 1196753 URL: http://svn.apache.org/viewvc?rev=1196753&view=rev Log: UIMA-1435 Modified client side of UIMA AS to manage multiple connections to brokers. Previously only one static connection per jvm was allowed.
Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java?rev=1196753&r1=1196752&r2=1196753&view=diff ============================================================================== --- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java (original) +++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java Wed Nov 2 18:42:11 2011 @@ -157,7 +157,7 @@ public class ActiveMQMessageSender exten public MessageProducer getMessageProducer() { if ( engine.running && engine.producerInitialized == false ) { try { - setConnection(engine.sharedConnection.getConnection()); + setConnection(engine.lookupConnection(getBrokerURL()).getConnection()); initializeProducer(); engine.producerInitialized = true; } catch( Exception e) { Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java?rev=1196753&r1=1196752&r2=1196753&view=diff ============================================================================== --- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java (original) +++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java Wed Nov 2 18:42:11 2011 @@ -30,7 +30,6 @@ import java.util.concurrent.Semaphore; import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.Destination; -import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.MessageProducer; @@ -38,18 +37,13 @@ import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import javax.management.ObjectName; -import javax.naming.Context; import javax.naming.InitialContext; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQMessageConsumer; import org.apache.activemq.ActiveMQPrefetchPolicy; -import org.apache.activemq.ActiveMQSession; -import org.apache.activemq.RedeliveryPolicy; import org.apache.activemq.command.ActiveMQBytesMessage; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQTempDestination; import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.uima.UIMAFramework; import org.apache.uima.UIMA_IllegalArgumentException; @@ -64,7 +58,6 @@ import org.apache.uima.aae.controller.Co import org.apache.uima.aae.controller.ControllerLifecycle; import org.apache.uima.aae.controller.Endpoint; import org.apache.uima.aae.controller.UimacppServiceController; -import org.apache.uima.aae.delegate.Delegate; import org.apache.uima.aae.delegate.Delegate.DelegateEntry; import org.apache.uima.aae.error.AsynchAEException; import org.apache.uima.aae.error.UimaASMetaRequestTimeout; @@ -74,9 +67,7 @@ import org.apache.uima.aae.message.UIMAM import org.apache.uima.adapter.jms.JmsConstants; import org.apache.uima.adapter.jms.activemq.SpringContainerDeployer; import org.apache.uima.adapter.jms.activemq.UimaEEAdminSpringContext; -import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngineCommon_impl.ClientRequest; import org.apache.uima.adapter.jms.service.Dd2spring; -import org.apache.uima.analysis_engine.AnalysisEngineDescription; import org.apache.uima.analysis_engine.metadata.AnalysisEngineMetaData; import org.apache.uima.cas.CAS; import org.apache.uima.impl.UimaVersion; @@ -98,8 +89,6 @@ public class BaseUIMAAsynchronousEngine_ private MessageProducer producer; - private String brokerURI = null; - private Session session = null; private Session consumerSession = null; @@ -137,6 +126,7 @@ public class BaseUIMAAsynchronousEngine_ "UIMA-AS version " + UIMAFramework.getVersionString()); } + protected TextMessage createTextMessage() throws ResourceInitializationException { return new ActiveMQTextMessage(); } @@ -232,7 +222,8 @@ public class BaseUIMAAsynchronousEngine_ } private void stopConnection() { - if (sharedConnection != null) { + SharedConnection sharedConnection; + if ((sharedConnection = lookupConnection(brokerURI)) != null) { // Remove a client from registry sharedConnection.unregisterClient(this); // The destroy method closes the JMS connection when @@ -277,8 +268,8 @@ public class BaseUIMAAsynchronousEngine_ sharedConnectionSemaphore.acquire(); stopConnection(); } catch (InterruptedException ex) { - // Force connection stop - stopConnection(); + // Force connection stop + stopConnection(); if (UIMAFramework.getLogger(CLASS_NAME).isLoggable( Level.WARNING)) { UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, @@ -330,6 +321,7 @@ public class BaseUIMAAsynchronousEngine_ private boolean connectionClosedOrInvalid() { synchronized (connectionMux) { + SharedConnection sharedConnection = lookupConnection(brokerURI); if (sharedConnection == null || sharedConnection.getConnection() == null || ((ActiveMQConnection) sharedConnection.getConnection()) @@ -344,11 +336,13 @@ public class BaseUIMAAsynchronousEngine_ return false; } - protected void createSharedConnection(String aBrokerURI) throws Exception { + protected SharedConnection createSharedConnection(String aBrokerURI) throws Exception { + SharedConnection sharedConnection = null; synchronized (connectionMux) { try { // Acquire global static semaphore sharedConnectionSemaphore.acquire(); + sharedConnection = lookupConnection(aBrokerURI); // check the state of a connection if (connectionClosedOrInvalid()) { if (sharedConnection != null @@ -373,6 +367,8 @@ public class BaseUIMAAsynchronousEngine_ sharedConnection = new SharedConnection( new ActiveMQConnectionFactory(aBrokerURI), aBrokerURI); + + sharedConnections.put( aBrokerURI, sharedConnection); // Add AMQ specific connection validator sharedConnection .setConnectionValidator(connectionValidator); @@ -399,6 +395,7 @@ public class BaseUIMAAsynchronousEngine_ } } + return sharedConnection; } private void addPrefetch(ActiveMQConnection aConnection) { @@ -407,13 +404,13 @@ public class BaseUIMAAsynchronousEngine_ ((ActiveMQConnection) aConnection).setPrefetchPolicy(prefetchPolicy); } - private void validateConnection(String aBrokerURI) throws Exception { + private SharedConnection validateConnection(String aBrokerURI) throws Exception { // checks if a sharedConnection exists and if not creates a new one - createSharedConnection(aBrokerURI); + return createSharedConnection(aBrokerURI); } protected Session getSession(String aBrokerURI) throws Exception { - validateConnection(aBrokerURI); + SharedConnection sharedConnection = validateConnection(aBrokerURI); return getSession(sharedConnection.getConnection()); } @@ -423,7 +420,7 @@ public class BaseUIMAAsynchronousEngine_ } protected MessageProducer lookupProducerForEndpoint(Endpoint anEndpoint) throws Exception { - if (sharedConnection == null || producerSession == null) { + if (lookupConnection(brokerURI) == null || producerSession == null) { throw new ResourceInitializationException(); } Destination dest = producerSession.createQueue(anEndpoint.getEndpoint()); @@ -432,7 +429,7 @@ public class BaseUIMAAsynchronousEngine_ protected void initializeProducer(String aBrokerURI, String aQueueName) throws Exception { // Check if a sharedConnection exists. If not it creates one - createSharedConnection(aBrokerURI); + SharedConnection sharedConnection = createSharedConnection(aBrokerURI); synchronized (connectionMux) { initializeProducer(aBrokerURI, aQueueName, sharedConnection.getConnection()); } @@ -503,7 +500,7 @@ public class BaseUIMAAsynchronousEngine_ * @throws Exception */ protected void initializeConsumer(String aBrokerURI) throws Exception { - createSharedConnection(aBrokerURI); + SharedConnection sharedConnection = createSharedConnection(aBrokerURI); synchronized (connectionMux) { initializeConsumer(aBrokerURI, sharedConnection.getConnection()); } @@ -698,6 +695,7 @@ public class BaseUIMAAsynchronousEngine_ // prevent a race condition. createSharedConnection(brokerURI); synchronized (connectionMux) { + SharedConnection sharedConnection = lookupConnection(brokerURI); // Reuse existing JMS connection if available if (sharedConnection != null) { initializeProducer(brokerURI, endpoint, sharedConnection.getConnection()); @@ -876,7 +874,7 @@ public class BaseUIMAAsynchronousEngine_ * */ public void undeploy(String aSpringContainerId, int stop_level) throws Exception { - if (aSpringContainerId == null) { + if (aSpringContainerId == null ) { return; } Modified: uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java?rev=1196753&r1=1196752&r2=1196753&view=diff ============================================================================== --- uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java (original) +++ uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java Wed Nov 2 18:42:11 2011 @@ -29,6 +29,9 @@ import java.io.Reader; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import javax.jms.Connection; import javax.jms.Message; @@ -40,6 +43,7 @@ import junit.framework.Assert; import org.apache.activemq.ActiveMQMessageConsumer; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQDestination; +import org.apache.commons.collections.functors.NotNullPredicate; import org.apache.uima.UIMAFramework; import org.apache.uima.UIMA_IllegalStateException; import org.apache.uima.aae.UimaClassFactory; @@ -47,6 +51,7 @@ import org.apache.uima.aae.client.UimaAS import org.apache.uima.aae.client.UimaAsBaseCallbackListener; import org.apache.uima.aae.client.UimaAsynchronousEngine; import org.apache.uima.aae.controller.Endpoint; +import org.apache.uima.aae.error.MessageTimeoutException; import org.apache.uima.aae.error.ServiceShutdownException; import org.apache.uima.adapter.jms.JmsConstants; import org.apache.uima.adapter.jms.activemq.JmsOutputChannel; @@ -65,7 +70,10 @@ import org.apache.uima.resource.Resource import org.apache.uima.resource.ResourceProcessException; import org.apache.uima.resource.ResourceSpecifier; import org.apache.uima.resource.metadata.ProcessingResourceMetaData; +import org.apache.uima.resourceSpecifier.factory.DeploymentDescriptorFactory; +import org.apache.uima.resourceSpecifier.factory.UimaASDeploymentDescriptor; import org.apache.uima.util.XMLInputSource; +import org.josql.expressions.IsNullExpression; public class TestUimaASExtended extends BaseTestSupport { @@ -91,6 +99,81 @@ public class TestUimaASExtended extends + System.getProperty("file.separator") + "bin" + System.getProperty("file.separator") + "dd2spring.xsl"); } + public void testMultipleSyncClientsWithMultipleBrokers() throws Exception { + System.out.println("-------------- testMultipleSyncClientsWithMultipleBrokers -------------"); + + class RunnableClient implements Runnable { + String brokerURL; + BaseTestSupport testSupport; + BaseUIMAAsynchronousEngine_impl uimaAsEngine; + + RunnableClient(String brokerURL,BaseTestSupport testSupport) { + this.brokerURL = brokerURL; + this.testSupport = testSupport; + } + public void initialize(String dd, String serviceEndpoint) throws Exception { + uimaAsEngine = new BaseUIMAAsynchronousEngine_impl(); + // Deploy Uima AS Primitive Service + deployService(uimaAsEngine, dd); + + @SuppressWarnings("unchecked") + Map<String, Object> appCtx = buildContext(brokerURL, serviceEndpoint); + appCtx.put(UimaAsynchronousEngine.Timeout, 1100); + appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100); + testSupport.initialize(uimaAsEngine, appCtx); + waitUntilInitialized(); + } + public void run() { + try { + for (int i = 0; i < 1000; i++) { + CAS cas = uimaAsEngine.getCAS(); + cas.setDocumentText("Some Text"); + System.out.println("UIMA AS Client#"+ Thread.currentThread().getId()+" Sending CAS#"+(i + 1) + " Request to a Service Managed by Broker:"+brokerURL); + try { + uimaAsEngine.sendAndReceiveCAS(cas); + } catch( Exception e) { + e.printStackTrace(); + } finally { + cas.release(); + } + } + System.out.println("Thread:"+Thread.currentThread().getId()+" Completed run()"); + uimaAsEngine.stop(); + } catch( Exception e) { + e.printStackTrace(); + } + + } + + } + + ExecutorService executor = Executors.newCachedThreadPool(); + + // change broker URl in system properties + System.setProperty("BrokerURL", broker.getMasterConnectorURI().toString()); + + RunnableClient client1 = + new RunnableClient(broker.getMasterConnectorURI(), this); + client1.initialize(relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml", "NoOpAnnotatorQueue"); + + final BrokerService broker2 = setupSecondaryBroker(true); + + // change broker URl in system properties + System.setProperty("BrokerURL", broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString()); + + RunnableClient client2 = + new RunnableClient(broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString(), this); + client2.initialize(relativePath + "/Deploy_NoOpAnnotatorWithPlaceholder.xml", "NoOpAnnotatorQueue"); + + Future<?> f1 = executor.submit(client1); + Future<?> f2 = executor.submit(client2); + f1.get(); + f2.get(); + executor.shutdownNow(); + broker2.stop(); + broker.stop(); + } + /** * Tests service quiesce and stop support. This test sets a CasPool to 1 to send just one CAS at a * time. After the first CAS is sent, a thread is started with a timer to expire before the reply @@ -120,6 +203,7 @@ public class TestUimaASExtended extends spinShutdownThread(eeUimaEngine, 5000, containers, SpringContainerDeployer.QUIESCE_AND_STOP); runTest(appCtx, eeUimaEngine, String.valueOf(broker.getMasterConnectorURI()), "TopLevelTaeQueue", 3, EXCEPTION_LATCH); + eeUimaEngine.stop(); } public void testStopNow() throws Exception { @@ -1243,8 +1327,16 @@ public class TestUimaASExtended extends System.setProperty(JmsConstants.SessionTimeoutOverride, "2500000"); deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml"); deployService(eeUimaEngine, relativePath + "/Deploy_AggregateAnnotator.xml"); - runTest(null, eeUimaEngine, String.valueOf(broker.getMasterConnectorURI()), "TopLevelTaeQueue", - 1, PROCESS_LATCH); + + Map<String, Object> appCtx = buildContext(String.valueOf(broker.getMasterConnectorURI()), + "TopLevelTaeQueue"); + appCtx.put(UimaAsynchronousEngine.Timeout, 0); + appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 0); + + addExceptionToignore(org.apache.uima.aae.error.UimaEEServiceException.class); + + runTest(appCtx, eeUimaEngine, String.valueOf(broker.getMasterConnectorURI()), "TopLevelTaeQueue", + 10, PROCESS_LATCH); } public void testDeployAggregateServiceWithDelegateTimeoutAndContinueOnError() throws Exception { Modified: uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java?rev=1196753&r1=1196752&r2=1196753&view=diff ============================================================================== --- uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java (original) +++ uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java Wed Nov 2 18:42:11 2011 @@ -164,7 +164,7 @@ public abstract class BaseTestSupport ex return false; } - protected void initialize(BaseUIMAAsynchronousEngine_impl eeUimaEngine, Map<String, Object> appCtx) + public void initialize(BaseUIMAAsynchronousEngine_impl eeUimaEngine, Map<String, Object> appCtx) throws Exception { eeUimaEngine.addStatusCallbackListener(listener); eeUimaEngine.initialize(appCtx); Modified: uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java?rev=1196753&r1=1196752&r2=1196753&view=diff ============================================================================== --- uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java (original) +++ uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java Wed Nov 2 18:42:11 2011 @@ -37,6 +37,7 @@ import org.apache.uima.aae.message.Async import org.apache.uima.aae.message.UimaMessageValidator; import org.apache.uima.adapter.jms.JmsConstants; import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngineCommon_impl.ClientRequest; +import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngineCommon_impl.SharedConnection; import org.apache.uima.adapter.jms.message.PendingMessage; import org.apache.uima.cas.CAS; import org.apache.uima.jms.error.handler.BrokerConnectionException; @@ -135,7 +136,7 @@ public abstract class BaseMessageSender * that it will not be sent to the destination due to the fact the broker connection is down. */ private boolean reject(PendingMessage pm ) { - return reject(pm, new BrokerConnectionException("Unable To Deliver Message To Destination. Connection To Broker "+engine.sharedConnection.getBroker()+" Has Been Lost")); + return reject(pm, new BrokerConnectionException("Unable To Deliver Message To Destination. Connection To Broker "+engine.getBrokerURI()+" Has Been Lost")); } /** @@ -146,7 +147,8 @@ public abstract class BaseMessageSender boolean rejectRequest = false; // If the connection to a broker was lost, notify the client // and reject the request unless this is getMeta Ping request. - if ( !engine.sharedConnection.isConnectionValid() ) { + SharedConnection sharedConnection = engine.lookupConnection(engine.getBrokerURI()); + if ( sharedConnection != null && !sharedConnection.isConnectionValid() ) { String messageKind = ""; if (pm.getMessageType() == AsynchAEMessage.GetMeta ) { messageKind = "GetMeta"; @@ -265,6 +267,9 @@ public abstract class BaseMessageSender // special case that we dont reject even though the broker connection has been lost. It is allow to // fall through and will be sent as soon as the connection is recovered. boolean rejectRequest = reject(pm); + if ( !engine.running) { + break; + } // blocks until the connection is re-established with a broker engine.recoverSharedConnectionIfClosed(); // get the producer initialized from a valid connection Modified: uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java?rev=1196753&r1=1196752&r2=1196753&view=diff ============================================================================== --- uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java (original) +++ uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java Wed Nov 2 18:42:11 2011 @@ -76,7 +76,6 @@ import org.apache.uima.aae.monitor.stati import org.apache.uima.adapter.jms.ConnectionValidator; import org.apache.uima.adapter.jms.JmsConstants; import org.apache.uima.adapter.jms.message.PendingMessage; -import org.apache.uima.analysis_engine.AnalysisEngineProcessException; import org.apache.uima.cas.CAS; import org.apache.uima.cas.impl.AllowPreexistingFS; import org.apache.uima.cas.impl.XmiSerializationSharedData; @@ -101,7 +100,9 @@ public abstract class BaseUIMAAsynchrono public enum ClientState {INITIALIZING, RUNNING, FAILED, RECONNECTING, STOPPING, STOPPED}; protected ClientState state = ClientState.INITIALIZING; - + + protected String brokerURI = null; + protected static final String SHADOW_CAS_POOL = "ShadowCasPool"; protected static final int MetadataTimeout = 1; @@ -200,7 +201,10 @@ public abstract class BaseUIMAAsynchrono protected volatile boolean producerInitialized; - protected static SharedConnection sharedConnection = null; + protected static ConcurrentHashMap<String, SharedConnection> sharedConnections = + new ConcurrentHashMap<String, SharedConnection>(); + + //protected static SharedConnection sharedConnection = null; protected Thread shutdownHookThread = null; @@ -239,6 +243,15 @@ public abstract class BaseUIMAAsynchrono abstract protected void initializeConsumer(String aBrokerURI, Connection connection) throws Exception; + //abstract protected String getBrokerURI(); + + protected void setBrokeryURI(String brokerURI ) { + this.brokerURI = brokerURI; + } + protected String getBrokerURI() { + return brokerURI; + } + public void addStatusCallbackListener(UimaAsBaseCallbackListener aListener) { if (running) { throw new UIMA_IllegalStateException(JmsConstants.JMS_LOG_RESOURCE_BUNDLE,"UIMAJMS_listener_added_after_initialize__WARNING", new Object[]{}); @@ -830,6 +843,8 @@ public abstract class BaseUIMAAsynchrono // The sendCAS() method is synchronized no need to synchronize the code below if (serviceDelegate.getState() == Delegate.TIMEOUT_STATE ) { + SharedConnection sharedConnection = lookupConnection(getBrokerURI()); + // Send Ping to service as getMeta request if ( !serviceDelegate.isAwaitingPingReply() && sharedConnection.isOpen() ) { serviceDelegate.setAwaitingPingReply(); @@ -861,7 +876,7 @@ public abstract class BaseUIMAAsynchrono return casReferenceId; } else { if ( !requestToCache.isSynchronousInvocation() ) { - Exception exception = new BrokerConnectionException("Unable To Deliver CAS:"+requestToCache.getCasReferenceId()+" To Destination. Connection To Broker "+sharedConnection.getBroker()+" Has Been Lost"); + Exception exception = new BrokerConnectionException("Unable To Deliver CAS:"+requestToCache.getCasReferenceId()+" To Destination. Connection To Broker "+getBrokerURI()+" Has Been Lost"); handleException(exception, requestToCache.getCasReferenceId(), null, requestToCache, true); return casReferenceId; } else { @@ -875,7 +890,7 @@ public abstract class BaseUIMAAsynchrono } } } - + SharedConnection sharedConnection = lookupConnection(getBrokerURI()); if ( !sharedConnection.isOpen() ) { if (requestToCache != null && !requestToCache.isSynchronousInvocation() && aCAS != null ) { aCAS.release(); @@ -926,7 +941,10 @@ public abstract class BaseUIMAAsynchrono * */ public synchronized String sendCAS(CAS aCAS) throws ResourceProcessException { - return this.sendCAS(aCAS, produceNewClientRequestObject()); + if ( !running ) { + throw new ResourceProcessException(new UimaEEServiceException("Uima AS Client Has Been Stopped. Rejecting Request to Process CAS")); + } + return this.sendCAS(aCAS, produceNewClientRequestObject()); } /** @@ -1188,8 +1206,45 @@ public abstract class BaseUIMAAsynchrono String nodeIP = message.getStringProperty(AsynchAEMessage.ServerIP); String pid = message.getStringProperty(AsynchAEMessage.UimaASProcessPID); if ( casReferenceId != null && nodeIP != null && pid != null) { - onBeforeProcessCAS(status,nodeIP, pid); - } + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { + UIMAFramework.getLogger(CLASS_NAME).logrb( + Level.FINE, + CLASS_NAME.getName(), + "handleServiceInfo", + JmsConstants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAJMS_calling_onBeforeProcessCAS_FINE", + new Object[] { + casReferenceId, + String.valueOf(casCachedRequest.getCAS().hashCode()) + }); + } + onBeforeProcessCAS(status,nodeIP, pid); + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { + UIMAFramework.getLogger(CLASS_NAME).logrb( + Level.FINE, + CLASS_NAME.getName(), + "handleServiceInfo", + JmsConstants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAJMS_completed_onBeforeProcessCAS_FINE", + new Object[] { + casReferenceId, + String.valueOf(casCachedRequest.getCAS().hashCode()) + }); + } + } else { + UIMAFramework.getLogger(CLASS_NAME).logrb( + Level.INFO, + CLASS_NAME.getName(), + "handleServiceInfo", + JmsConstants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAJMS_skipping_onBeforeProcessCAS_INFO", + new Object[] { + casReferenceId, + String.valueOf(casCachedRequest.getCAS().hashCode()), + nodeIP, pid + }); + + } } } catch( Exception e) { UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), @@ -2561,7 +2616,8 @@ public abstract class BaseUIMAAsynchrono } public boolean connectionOpen() { - if ( sharedConnection != null ) { + SharedConnection sharedConnection; + if ( (sharedConnection = lookupConnection(getBrokerURI())) != null ) { return sharedConnection.isConnectionValid(); } return false; @@ -2571,7 +2627,9 @@ public abstract class BaseUIMAAsynchrono * when the client is stopped or the connection is recovered. */ public boolean recoverSharedConnectionIfClosed() { + SharedConnection sharedConnection; if ( !connectionOpen() ) { + sharedConnection = lookupConnection(getBrokerURI()); while ( running ) { // blocks until connection is refreshed try { @@ -2611,6 +2669,14 @@ public abstract class BaseUIMAAsynchrono protected void setReleaseCASMessage(TextMessage msg, String aCasReferenceId) throws Exception { } + + protected SharedConnection lookupConnection(String brokerUrl) { + if ( sharedConnections.containsKey(brokerUrl) ) { + return sharedConnections.get(brokerUrl); + } + return null; + } + // This class is used to share JMS Connection by all instances of UIMA AS // client deployed in the same JVM. public static class SharedConnection { @@ -2798,6 +2864,10 @@ public abstract class BaseUIMAAsynchrono * @return */ public boolean destroy() { + return destroy(false); + } + + public boolean destroy(boolean doShutdown) { synchronized(destroyMux) { // Check if all clients have terminated and only than stop the shared connection if (getClientCount() == 0 && connection != null Modified: uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties?rev=1196753&r1=1196752&r2=1196753&view=diff ============================================================================== --- uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties (original) +++ uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties Wed Nov 2 18:42:11 2011 @@ -219,5 +219,6 @@ UIMAJMS_cas_dispatched__INFO= \t>>>>>>> UIMAJMS_cas_reply_rcvd_FINE = \t<<<<<<< UIMA AS Client Received Reply For CAS:{0} Hashcode:{1} UIMAJMS_cas_added_to_pending_FINE = UIMA AS Dispatch Thread Added CAS:{0} Hashcode:{1} To Outstanding List. Current List:\n\n{2}\n\n UIMAJMS_cas_submitted_FINE=UIMA AS sendAndReceive Received CAS:{0} HashCode:{1} For Processing - Forwarding to sendCAS() on Thread:{2} -UIMAJMS_calling_onBeforeProcessCAS_INFO = UIMA AS Client Calling onBeforeMessageProcess For CAS:{0} Hashcode:{1} -UIMAJMS_completed_onBeforeProcessCAS_INFO = UIMA AS Client Completed onBeforeMessageProcess For CAS:{0} Hashcode:{1} +UIMAJMS_calling_onBeforeProcessCAS_FINE = UIMA AS Client Calling onBeforeMessageProcess For CAS:{0} Hashcode:{1} +UIMAJMS_completed_onBeforeProcessCAS_FINE = UIMA AS Client Completed onBeforeMessageProcess For CAS:{0} Hashcode:{1} +UIMAJMS_skipping_onBeforeProcessCAS_INFO= UIMA AS Client Not Calling onBeforeMessageProcess For CAS:{0} Hashcode:{1}. Invalid state: Node: {2} IP: {3} \ No newline at end of file