Author: cwiklik
Date: Wed Nov  2 18:42:11 2011
New Revision: 1196753

URL: http://svn.apache.org/viewvc?rev=1196753&view=rev
Log:
UIMA-1435 Modified client side of UIMA AS to manage multiple connections to 
brokers. Previously only one static connection per jvm was allowed.  

Modified:
    
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
    
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
    
uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
    
uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java
    
uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java
    
uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
    
uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties

Modified: 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java?rev=1196753&r1=1196752&r2=1196753&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
 Wed Nov  2 18:42:11 2011
@@ -157,7 +157,7 @@ public class ActiveMQMessageSender exten
   public MessageProducer getMessageProducer() {
     if ( engine.running && engine.producerInitialized == false  ) {
       try {
-        setConnection(engine.sharedConnection.getConnection());
+        setConnection(engine.lookupConnection(getBrokerURL()).getConnection());
         initializeProducer();
         engine.producerInitialized = true;
       } catch( Exception e) {

Modified: 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java?rev=1196753&r1=1196752&r2=1196753&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
 Wed Nov  2 18:42:11 2011
@@ -30,7 +30,6 @@ import java.util.concurrent.Semaphore;
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.Destination;
-import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
@@ -38,18 +37,13 @@ import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.management.ObjectName;
-import javax.naming.Context;
 import javax.naming.InitialContext;
 
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ActiveMQMessageConsumer;
 import org.apache.activemq.ActiveMQPrefetchPolicy;
-import org.apache.activemq.ActiveMQSession;
-import org.apache.activemq.RedeliveryPolicy;
 import org.apache.activemq.command.ActiveMQBytesMessage;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQTempDestination;
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.uima.UIMAFramework;
 import org.apache.uima.UIMA_IllegalArgumentException;
@@ -64,7 +58,6 @@ import org.apache.uima.aae.controller.Co
 import org.apache.uima.aae.controller.ControllerLifecycle;
 import org.apache.uima.aae.controller.Endpoint;
 import org.apache.uima.aae.controller.UimacppServiceController;
-import org.apache.uima.aae.delegate.Delegate;
 import org.apache.uima.aae.delegate.Delegate.DelegateEntry;
 import org.apache.uima.aae.error.AsynchAEException;
 import org.apache.uima.aae.error.UimaASMetaRequestTimeout;
@@ -74,9 +67,7 @@ import org.apache.uima.aae.message.UIMAM
 import org.apache.uima.adapter.jms.JmsConstants;
 import org.apache.uima.adapter.jms.activemq.SpringContainerDeployer;
 import org.apache.uima.adapter.jms.activemq.UimaEEAdminSpringContext;
-import 
org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngineCommon_impl.ClientRequest;
 import org.apache.uima.adapter.jms.service.Dd2spring;
-import org.apache.uima.analysis_engine.AnalysisEngineDescription;
 import org.apache.uima.analysis_engine.metadata.AnalysisEngineMetaData;
 import org.apache.uima.cas.CAS;
 import org.apache.uima.impl.UimaVersion;
@@ -98,8 +89,6 @@ public class BaseUIMAAsynchronousEngine_
 
   private MessageProducer producer;
 
-  private String brokerURI = null;
-
   private Session session = null;
 
   private Session consumerSession = null;
@@ -137,6 +126,7 @@ public class BaseUIMAAsynchronousEngine_
             "UIMA-AS version " + UIMAFramework.getVersionString());
   }
 
+
   protected TextMessage createTextMessage() throws 
ResourceInitializationException {
     return new ActiveMQTextMessage();
   }
@@ -232,7 +222,8 @@ public class BaseUIMAAsynchronousEngine_
 
   }
   private void stopConnection() {
-    if (sharedConnection != null) {
+       SharedConnection sharedConnection;
+    if ((sharedConnection = lookupConnection(brokerURI)) != null) {
       // Remove a client from registry
       sharedConnection.unregisterClient(this);
       // The destroy method closes the JMS connection when
@@ -277,8 +268,8 @@ public class BaseUIMAAsynchronousEngine_
                                        sharedConnectionSemaphore.acquire();
                                        stopConnection();
                                } catch (InterruptedException ex) {
-                                 // Force connection stop
-          stopConnection();
+                                   // Force connection stop
+                    stopConnection();
                                        if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(
                                                        Level.WARNING)) {
                                                
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING,
@@ -330,6 +321,7 @@ public class BaseUIMAAsynchronousEngine_
 
        private boolean connectionClosedOrInvalid() {
                synchronized (connectionMux) {
+                       SharedConnection sharedConnection = 
lookupConnection(brokerURI);
                        if (sharedConnection == null
                                        || sharedConnection.getConnection() == 
null
                                        || ((ActiveMQConnection) 
sharedConnection.getConnection())
@@ -344,11 +336,13 @@ public class BaseUIMAAsynchronousEngine_
                return false;
        }
 
-       protected void createSharedConnection(String aBrokerURI) throws 
Exception {
+       protected SharedConnection createSharedConnection(String aBrokerURI) 
throws Exception {
+               SharedConnection sharedConnection = null;
                synchronized (connectionMux) {
                        try {
                                // Acquire global static semaphore
                                sharedConnectionSemaphore.acquire();
+                               sharedConnection = lookupConnection(aBrokerURI);
                                // check the state of a connection
                                if (connectionClosedOrInvalid()) {
                                        if (sharedConnection != null
@@ -373,6 +367,8 @@ public class BaseUIMAAsynchronousEngine_
                                        sharedConnection = new SharedConnection(
                                                        new 
ActiveMQConnectionFactory(aBrokerURI),
                                                        aBrokerURI);
+
+                                       sharedConnections.put( aBrokerURI, 
sharedConnection);
                                        // Add AMQ specific connection validator
                                        sharedConnection
                                                        
.setConnectionValidator(connectionValidator);
@@ -399,6 +395,7 @@ public class BaseUIMAAsynchronousEngine_
                        }
 
                }
+               return sharedConnection;
        }
 
   private void addPrefetch(ActiveMQConnection aConnection) {
@@ -407,13 +404,13 @@ public class BaseUIMAAsynchronousEngine_
     ((ActiveMQConnection) aConnection).setPrefetchPolicy(prefetchPolicy);
   }
 
-  private void validateConnection(String aBrokerURI) throws Exception {
+  private SharedConnection validateConnection(String aBrokerURI) throws 
Exception {
     // checks if a sharedConnection exists and if not creates a new one
-    createSharedConnection(aBrokerURI);
+    return createSharedConnection(aBrokerURI);
   }
 
   protected Session getSession(String aBrokerURI) throws Exception {
-    validateConnection(aBrokerURI);
+       SharedConnection sharedConnection = validateConnection(aBrokerURI);
     return getSession(sharedConnection.getConnection());
   }
 
@@ -423,7 +420,7 @@ public class BaseUIMAAsynchronousEngine_
   }
 
   protected MessageProducer lookupProducerForEndpoint(Endpoint anEndpoint) 
throws Exception {
-    if (sharedConnection == null || producerSession == null) {
+    if (lookupConnection(brokerURI) == null || producerSession == null) {
       throw new ResourceInitializationException();
     }
     Destination dest = producerSession.createQueue(anEndpoint.getEndpoint());
@@ -432,7 +429,7 @@ public class BaseUIMAAsynchronousEngine_
 
   protected void initializeProducer(String aBrokerURI, String aQueueName) 
throws Exception {
     // Check if a sharedConnection exists. If not it creates one
-    createSharedConnection(aBrokerURI);
+    SharedConnection sharedConnection = createSharedConnection(aBrokerURI);
        synchronized (connectionMux) {
            initializeProducer(aBrokerURI, aQueueName, 
sharedConnection.getConnection());
        }
@@ -503,7 +500,7 @@ public class BaseUIMAAsynchronousEngine_
    * @throws Exception
    */
   protected void initializeConsumer(String aBrokerURI) throws Exception {
-    createSharedConnection(aBrokerURI);
+       SharedConnection  sharedConnection = createSharedConnection(aBrokerURI);
        synchronized (connectionMux) {
            initializeConsumer(aBrokerURI, sharedConnection.getConnection());
        }
@@ -698,6 +695,7 @@ public class BaseUIMAAsynchronousEngine_
       // prevent a race condition.
       createSharedConnection(brokerURI);
          synchronized (connectionMux) {
+        SharedConnection sharedConnection = lookupConnection(brokerURI);
         // Reuse existing JMS connection if available
         if (sharedConnection != null) {
           initializeProducer(brokerURI, endpoint, 
sharedConnection.getConnection());
@@ -876,7 +874,7 @@ public class BaseUIMAAsynchronousEngine_
    * 
    */
   public void undeploy(String aSpringContainerId, int stop_level) throws 
Exception {
-    if (aSpringContainerId == null) {
+    if (aSpringContainerId == null  ) {
       return;
     }
 

Modified: 
uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java?rev=1196753&r1=1196752&r2=1196753&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
 Wed Nov  2 18:42:11 2011
@@ -29,6 +29,9 @@ import java.io.Reader;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import javax.jms.Connection;
 import javax.jms.Message;
@@ -40,6 +43,7 @@ import junit.framework.Assert;
 import org.apache.activemq.ActiveMQMessageConsumer;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.commons.collections.functors.NotNullPredicate;
 import org.apache.uima.UIMAFramework;
 import org.apache.uima.UIMA_IllegalStateException;
 import org.apache.uima.aae.UimaClassFactory;
@@ -47,6 +51,7 @@ import org.apache.uima.aae.client.UimaAS
 import org.apache.uima.aae.client.UimaAsBaseCallbackListener;
 import org.apache.uima.aae.client.UimaAsynchronousEngine;
 import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.error.MessageTimeoutException;
 import org.apache.uima.aae.error.ServiceShutdownException;
 import org.apache.uima.adapter.jms.JmsConstants;
 import org.apache.uima.adapter.jms.activemq.JmsOutputChannel;
@@ -65,7 +70,10 @@ import org.apache.uima.resource.Resource
 import org.apache.uima.resource.ResourceProcessException;
 import org.apache.uima.resource.ResourceSpecifier;
 import org.apache.uima.resource.metadata.ProcessingResourceMetaData;
+import org.apache.uima.resourceSpecifier.factory.DeploymentDescriptorFactory;
+import org.apache.uima.resourceSpecifier.factory.UimaASDeploymentDescriptor;
 import org.apache.uima.util.XMLInputSource;
+import org.josql.expressions.IsNullExpression;
 
 public class TestUimaASExtended extends BaseTestSupport {
 
@@ -91,6 +99,81 @@ public class TestUimaASExtended extends 
             + System.getProperty("file.separator") + "bin" + 
System.getProperty("file.separator")
             + "dd2spring.xsl");
   }
+  public void testMultipleSyncClientsWithMultipleBrokers() throws Exception  {
+           System.out.println("-------------- 
testMultipleSyncClientsWithMultipleBrokers -------------");
+           
+           class RunnableClient implements Runnable {
+               String brokerURL;
+               BaseTestSupport testSupport;
+            BaseUIMAAsynchronousEngine_impl uimaAsEngine;
+               
+               RunnableClient(String brokerURL,BaseTestSupport testSupport) {
+                       this.brokerURL = brokerURL;
+                       this.testSupport = testSupport;
+               }
+               public void initialize(String dd, String serviceEndpoint) 
throws Exception {
+                       uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
+                   // Deploy Uima AS Primitive Service
+                   deployService(uimaAsEngine, dd);
+
+                       @SuppressWarnings("unchecked")
+                         Map<String, Object> appCtx = buildContext(brokerURL, 
serviceEndpoint);
+                         appCtx.put(UimaAsynchronousEngine.Timeout, 1100);
+                         appCtx.put(UimaAsynchronousEngine.CpcTimeout, 1100);
+                         testSupport.initialize(uimaAsEngine, appCtx);
+                         waitUntilInitialized();
+               }
+                       public void run() {
+                               try {
+                           for (int i = 0; i < 1000; i++) {
+                                     CAS cas = uimaAsEngine.getCAS();
+                                     cas.setDocumentText("Some Text");
+                                     System.out.println("UIMA AS Client#"+ 
Thread.currentThread().getId()+" Sending CAS#"+(i + 1) + " Request to a Service 
Managed by Broker:"+brokerURL);
+                                     try {
+                                               
uimaAsEngine.sendAndReceiveCAS(cas);
+                                     } catch( Exception e) {
+                                         e.printStackTrace();
+                                     } finally {
+                                       cas.release();
+                                     }
+                                   }
+                                   
System.out.println("Thread:"+Thread.currentThread().getId()+" Completed run()");
+                                   uimaAsEngine.stop();
+                               } catch( Exception e) {
+                                       e.printStackTrace();
+                               }
+
+                       }
+               
+           }
+           
+           ExecutorService executor = Executors.newCachedThreadPool();
+
+           //  change broker URl in system properties
+           System.setProperty("BrokerURL", 
broker.getMasterConnectorURI().toString());
+           
+           RunnableClient client1 = 
+                       new RunnableClient(broker.getMasterConnectorURI(), 
this);
+           client1.initialize(relativePath + 
"/Deploy_NoOpAnnotatorWithPlaceholder.xml", "NoOpAnnotatorQueue");
+
+           final BrokerService broker2 = setupSecondaryBroker(true);
+
+           //  change broker URl in system properties
+           System.setProperty("BrokerURL", 
broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString());
+
+           RunnableClient client2 = 
+                       new 
RunnableClient(broker2.getConnectorByName(DEFAULT_BROKER_URL_KEY_2).getUri().toString(),
 this);
+           client2.initialize(relativePath + 
"/Deploy_NoOpAnnotatorWithPlaceholder.xml", "NoOpAnnotatorQueue");
+
+           Future<?> f1 = executor.submit(client1);
+           Future<?> f2 = executor.submit(client2);
+           f1.get();
+           f2.get();
+           executor.shutdownNow();
+           broker2.stop();
+           broker.stop();
+       }
+  
   /**
    * Tests service quiesce and stop support. This test sets a CasPool to 1 to 
send just one CAS at a
    * time. After the first CAS is sent, a thread is started with a timer to 
expire before the reply
@@ -120,6 +203,7 @@ public class TestUimaASExtended extends 
     spinShutdownThread(eeUimaEngine, 5000, containers, 
SpringContainerDeployer.QUIESCE_AND_STOP);
     runTest(appCtx, eeUimaEngine, 
String.valueOf(broker.getMasterConnectorURI()),
             "TopLevelTaeQueue", 3, EXCEPTION_LATCH);
+    eeUimaEngine.stop();
   }
 
   public void testStopNow() throws Exception {
@@ -1243,8 +1327,16 @@ public class TestUimaASExtended extends 
     System.setProperty(JmsConstants.SessionTimeoutOverride, "2500000");
     deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml");
     deployService(eeUimaEngine, relativePath + 
"/Deploy_AggregateAnnotator.xml");
-    runTest(null, eeUimaEngine, 
String.valueOf(broker.getMasterConnectorURI()), "TopLevelTaeQueue",
-            1, PROCESS_LATCH);
+    
+    Map<String, Object> appCtx = 
buildContext(String.valueOf(broker.getMasterConnectorURI()),
+            "TopLevelTaeQueue");
+    appCtx.put(UimaAsynchronousEngine.Timeout, 0);
+    appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 0);
+    
+    
addExceptionToignore(org.apache.uima.aae.error.UimaEEServiceException.class); 
+    
+    runTest(appCtx, eeUimaEngine, 
String.valueOf(broker.getMasterConnectorURI()), "TopLevelTaeQueue",
+            10, PROCESS_LATCH);
   }
 
   public void 
testDeployAggregateServiceWithDelegateTimeoutAndContinueOnError() throws 
Exception {

Modified: 
uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java?rev=1196753&r1=1196752&r2=1196753&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java
 Wed Nov  2 18:42:11 2011
@@ -164,7 +164,7 @@ public abstract class BaseTestSupport ex
     return false;
   }
 
-  protected void initialize(BaseUIMAAsynchronousEngine_impl eeUimaEngine, 
Map<String, Object> appCtx)
+  public void initialize(BaseUIMAAsynchronousEngine_impl eeUimaEngine, 
Map<String, Object> appCtx)
           throws Exception {
     eeUimaEngine.addStatusCallbackListener(listener);
     eeUimaEngine.initialize(appCtx);

Modified: 
uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java?rev=1196753&r1=1196752&r2=1196753&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseMessageSender.java
 Wed Nov  2 18:42:11 2011
@@ -37,6 +37,7 @@ import org.apache.uima.aae.message.Async
 import org.apache.uima.aae.message.UimaMessageValidator;
 import org.apache.uima.adapter.jms.JmsConstants;
 import 
org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngineCommon_impl.ClientRequest;
+import 
org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngineCommon_impl.SharedConnection;
 import org.apache.uima.adapter.jms.message.PendingMessage;
 import org.apache.uima.cas.CAS;
 import org.apache.uima.jms.error.handler.BrokerConnectionException;
@@ -135,7 +136,7 @@ public abstract class BaseMessageSender 
    * that it will not be sent to the destination due to the fact the broker 
connection is down.
    */
   private boolean reject(PendingMessage pm ) {
-    return reject(pm, new BrokerConnectionException("Unable To Deliver Message 
To Destination. Connection To Broker "+engine.sharedConnection.getBroker()+" 
Has Been Lost"));
+    return reject(pm, new BrokerConnectionException("Unable To Deliver Message 
To Destination. Connection To Broker "+engine.getBrokerURI()+" Has Been Lost"));
   }
   
   /**
@@ -146,7 +147,8 @@ public abstract class BaseMessageSender 
     boolean rejectRequest = false;
     //  If the connection to a broker was lost, notify the client
     //  and reject the request unless this is getMeta Ping request.
-    if ( !engine.sharedConnection.isConnectionValid() ) {
+    SharedConnection sharedConnection = 
engine.lookupConnection(engine.getBrokerURI());
+    if ( sharedConnection != null && !sharedConnection.isConnectionValid() ) {
       String messageKind = "";
       if (pm.getMessageType() == AsynchAEMessage.GetMeta ) {
         messageKind = "GetMeta";
@@ -265,6 +267,9 @@ public abstract class BaseMessageSender 
       //  special case that we dont reject even though the broker connection 
has been lost. It is allow to
       //  fall through and will be sent as soon as the connection is recovered.
       boolean rejectRequest = reject(pm);
+      if ( !engine.running) {
+         break;
+      }
       //  blocks until the connection is re-established with a broker
       engine.recoverSharedConnectionIfClosed();
       //  get the producer initialized from a valid connection

Modified: 
uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java?rev=1196753&r1=1196752&r2=1196753&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
 Wed Nov  2 18:42:11 2011
@@ -76,7 +76,6 @@ import org.apache.uima.aae.monitor.stati
 import org.apache.uima.adapter.jms.ConnectionValidator;
 import org.apache.uima.adapter.jms.JmsConstants;
 import org.apache.uima.adapter.jms.message.PendingMessage;
-import org.apache.uima.analysis_engine.AnalysisEngineProcessException;
 import org.apache.uima.cas.CAS;
 import org.apache.uima.cas.impl.AllowPreexistingFS;
 import org.apache.uima.cas.impl.XmiSerializationSharedData;
@@ -101,7 +100,9 @@ public abstract class BaseUIMAAsynchrono
   public enum ClientState {INITIALIZING, RUNNING, FAILED, RECONNECTING, 
STOPPING, STOPPED};
   
   protected ClientState state = ClientState.INITIALIZING;
-  
+
+  protected String brokerURI = null;
+
   protected static final String SHADOW_CAS_POOL = "ShadowCasPool";
 
   protected static final int MetadataTimeout = 1;
@@ -200,7 +201,10 @@ public abstract class BaseUIMAAsynchrono
 
   protected volatile boolean producerInitialized;
 
-  protected static SharedConnection sharedConnection = null;
+  protected static ConcurrentHashMap<String, SharedConnection> 
sharedConnections =
+                 new ConcurrentHashMap<String, SharedConnection>();
+  
+  //protected static SharedConnection sharedConnection = null;
 
   protected Thread shutdownHookThread = null;
 
@@ -239,6 +243,15 @@ public abstract class BaseUIMAAsynchrono
   
   abstract protected void initializeConsumer(String aBrokerURI, Connection 
connection) throws Exception;
 
+  //abstract protected  String getBrokerURI();
+
+  protected void setBrokeryURI(String brokerURI ) {
+         this.brokerURI = brokerURI;
+  }
+  protected String getBrokerURI() {
+         return brokerURI;
+  }
+  
   public void addStatusCallbackListener(UimaAsBaseCallbackListener aListener) {
     if (running) {
           throw new 
UIMA_IllegalStateException(JmsConstants.JMS_LOG_RESOURCE_BUNDLE,"UIMAJMS_listener_added_after_initialize__WARNING",
 new Object[]{});
@@ -830,6 +843,8 @@ public abstract class BaseUIMAAsynchrono
 
         // The sendCAS() method is synchronized no need to synchronize the 
code below
         if (serviceDelegate.getState() == Delegate.TIMEOUT_STATE ) {
+          SharedConnection sharedConnection = lookupConnection(getBrokerURI());
+
           //  Send Ping to service as getMeta request
           if ( !serviceDelegate.isAwaitingPingReply() && 
sharedConnection.isOpen() ) {
             serviceDelegate.setAwaitingPingReply();
@@ -861,7 +876,7 @@ public abstract class BaseUIMAAsynchrono
             return casReferenceId;
           } else {
             if ( !requestToCache.isSynchronousInvocation() ) {
-              Exception exception = new BrokerConnectionException("Unable To 
Deliver CAS:"+requestToCache.getCasReferenceId()+" To Destination. Connection 
To Broker "+sharedConnection.getBroker()+" Has Been Lost");
+              Exception exception = new BrokerConnectionException("Unable To 
Deliver CAS:"+requestToCache.getCasReferenceId()+" To Destination. Connection 
To Broker "+getBrokerURI()+" Has Been Lost");
               handleException(exception, requestToCache.getCasReferenceId(), 
null, requestToCache, true);
               return casReferenceId;
             } else {
@@ -875,7 +890,7 @@ public abstract class BaseUIMAAsynchrono
             }
           }
         }
-
+        SharedConnection sharedConnection = lookupConnection(getBrokerURI());
         if ( !sharedConnection.isOpen() ) {
           if (requestToCache != null && 
!requestToCache.isSynchronousInvocation() && aCAS != null ) {
             aCAS.release();
@@ -926,7 +941,10 @@ public abstract class BaseUIMAAsynchrono
    * 
    */
   public synchronized String sendCAS(CAS aCAS) throws ResourceProcessException 
{
-    return this.sendCAS(aCAS, produceNewClientRequestObject());
+    if ( !running ) {
+       throw new ResourceProcessException(new UimaEEServiceException("Uima AS 
Client Has Been Stopped. Rejecting Request to Process CAS"));
+    }
+         return this.sendCAS(aCAS, produceNewClientRequestObject());
   }
 
   /**
@@ -1188,8 +1206,45 @@ public abstract class BaseUIMAAsynchrono
              String nodeIP = 
message.getStringProperty(AsynchAEMessage.ServerIP);
              String pid = 
message.getStringProperty(AsynchAEMessage.UimaASProcessPID);
              if ( casReferenceId != null && nodeIP != null && pid != null) {
-               onBeforeProcessCAS(status,nodeIP, pid);
-             }
+                 if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+                      UIMAFramework.getLogger(CLASS_NAME).logrb(
+                              Level.FINE,
+                              CLASS_NAME.getName(),
+                              "handleServiceInfo",
+                              JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                              "UIMAJMS_calling_onBeforeProcessCAS_FINE",
+                              new Object[] {
+                                 casReferenceId,
+                                
String.valueOf(casCachedRequest.getCAS().hashCode())
+                              });
+                  }
+                 onBeforeProcessCAS(status,nodeIP, pid);
+                 if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+                      UIMAFramework.getLogger(CLASS_NAME).logrb(
+                              Level.FINE,
+                              CLASS_NAME.getName(),
+                              "handleServiceInfo",
+                              JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                              "UIMAJMS_completed_onBeforeProcessCAS_FINE",
+                              new Object[] {
+                                 casReferenceId,
+                                
String.valueOf(casCachedRequest.getCAS().hashCode())
+                              });
+                  }
+            } else {
+                 UIMAFramework.getLogger(CLASS_NAME).logrb(
+                         Level.INFO,
+                         CLASS_NAME.getName(),
+                         "handleServiceInfo",
+                         JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                         "UIMAJMS_skipping_onBeforeProcessCAS_INFO",
+                         new Object[] {
+                                 casReferenceId,
+                                
String.valueOf(casCachedRequest.getCAS().hashCode()),
+                                nodeIP, pid 
+                         });
+                
+            }
        }
     } catch( Exception e) {
       UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, 
getClass().getName(),
@@ -2561,7 +2616,8 @@ public abstract class BaseUIMAAsynchrono
   }
 
   public boolean connectionOpen() {
-    if ( sharedConnection != null ) {
+       SharedConnection sharedConnection;
+    if ( (sharedConnection = lookupConnection(getBrokerURI())) != null ) {
       return sharedConnection.isConnectionValid();
     }
     return false;
@@ -2571,7 +2627,9 @@ public abstract class BaseUIMAAsynchrono
    * when the client is stopped or the connection is recovered.
    */
   public boolean recoverSharedConnectionIfClosed() {
+       SharedConnection sharedConnection;
     if ( !connectionOpen() ) {
+      sharedConnection = lookupConnection(getBrokerURI());
       while ( running ) {
         //  blocks until connection is refreshed 
         try {
@@ -2611,6 +2669,14 @@ public abstract class BaseUIMAAsynchrono
   protected void setReleaseCASMessage(TextMessage msg, String aCasReferenceId) 
throws Exception {
   }
 
+  
+  protected SharedConnection lookupConnection(String brokerUrl) {
+         if ( sharedConnections.containsKey(brokerUrl) ) {
+                 return sharedConnections.get(brokerUrl);
+         }
+         return null;
+  }
+  
   // This class is used to share JMS Connection by all instances of UIMA AS
   // client deployed in the same JVM.
   public static class SharedConnection {
@@ -2798,6 +2864,10 @@ public abstract class BaseUIMAAsynchrono
      * @return
      */
     public boolean destroy() {
+       return destroy(false);
+    }
+    
+    public boolean destroy(boolean doShutdown) {
       synchronized(destroyMux) {
         //  Check if all clients have terminated and only than stop the shared 
connection
         if (getClientCount() == 0 && connection != null

Modified: 
uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties?rev=1196753&r1=1196752&r2=1196753&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties
 Wed Nov  2 18:42:11 2011
@@ -219,5 +219,6 @@ UIMAJMS_cas_dispatched__INFO= \t>>>>>>> 
 UIMAJMS_cas_reply_rcvd_FINE = \t<<<<<<< UIMA AS Client Received Reply For 
CAS:{0} Hashcode:{1}
 UIMAJMS_cas_added_to_pending_FINE = UIMA AS Dispatch Thread Added CAS:{0} 
Hashcode:{1} To Outstanding List. Current List:\n\n{2}\n\n
 UIMAJMS_cas_submitted_FINE=UIMA AS sendAndReceive Received CAS:{0} 
HashCode:{1} For Processing - Forwarding to sendCAS() on Thread:{2}
-UIMAJMS_calling_onBeforeProcessCAS_INFO = UIMA AS Client Calling 
onBeforeMessageProcess For CAS:{0} Hashcode:{1}
-UIMAJMS_completed_onBeforeProcessCAS_INFO = UIMA AS Client Completed 
onBeforeMessageProcess For CAS:{0} Hashcode:{1}
+UIMAJMS_calling_onBeforeProcessCAS_FINE = UIMA AS Client Calling 
onBeforeMessageProcess For CAS:{0} Hashcode:{1}
+UIMAJMS_completed_onBeforeProcessCAS_FINE = UIMA AS Client Completed 
onBeforeMessageProcess For CAS:{0} Hashcode:{1}
+UIMAJMS_skipping_onBeforeProcessCAS_INFO= UIMA AS Client Not Calling 
onBeforeMessageProcess For CAS:{0} Hashcode:{1}. Invalid state: Node: {2} IP: 
{3}
\ No newline at end of file


Reply via email to