Author: cwiklik
Date: Thu Oct 20 17:13:38 2011
New Revision: 1186933

URL: http://svn.apache.org/viewvc?rev=1186933&view=rev
Log:
UIMA-2251 Fixed code responsible for recovery of delegate listener when send() 
fails

Modified:
    
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
    
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
    
uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties

Modified: 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java?rev=1186933&r1=1186932&r2=1186933&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
 Thu Oct 20 17:13:38 2011
@@ -316,7 +316,7 @@ public class JmsEndpointConnection_impl 
     open(delegateEndpoint.getEndpoint(), serverUri);
   }
 
-  public synchronized void open(String brokerUri, String anEndpointName) 
throws AsynchAEException,
+  public synchronized void open( String anEndpointName, String brokerUri) 
throws AsynchAEException,
           ServiceShutdownException {
     if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
       UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, 
CLASS_NAME.getName(), "open",
@@ -545,16 +545,6 @@ public class JmsEndpointConnection_impl 
           map.put(AsynchAEMessage.CasReference, 
aMessage.getStringProperty(AsynchAEMessage.CasReference));
           map.put(AsynchAEMessage.Endpoint, masterEndpoint);
           Exception e = new 
DelegateConnectionLostException("Controller:"+controller.getComponentName()+" 
Lost Connection to "+target+ ":"+masterEndpoint.getDelegateKey());
-          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
-            
-            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, 
CLASS_NAME.getName(),
-                    "send", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
-                    "UIMAEE_service_exception_WARNING", 
controller.getComponentName());
-
-            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, 
getClass().getName(),
-                    "send", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
-                    "UIMAEE_exception__WARNING", e);
-          }
           //  Handle error in ProcessErrorHandler
           ((BaseAnalysisEngineController)controller).handleError(map, e);
           return true; // return true as if this was successful send 
@@ -567,9 +557,6 @@ public class JmsEndpointConnection_impl 
           return true;
         }
       }
-
-      // Send a reply to a queue provided by the client
-
       // Stop messages and replies are sent to the endpoint provided in the 
destination object
       if ((command == AsynchAEMessage.Stop || command == 
AsynchAEMessage.ReleaseCAS || isReplyEndpoint)
               && delegateEndpoint.getDestination() != null) {
@@ -605,6 +592,7 @@ public class JmsEndpointConnection_impl 
         
brokerDestinations.getConnectionTimer().startTimer(connectionCreationTimestamp,
                 delegateEndpoint);
       }
+      
       // Succeeded sending the CAS
       return true;
     } catch (Exception e) {

Modified: 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java?rev=1186933&r1=1186932&r2=1186933&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
 Thu Oct 20 17:13:38 2011
@@ -501,11 +501,14 @@ public class JmsOutputChannel implements
                 if ( iC != null ) { 
                   try {
                     // Create a new Listener, new Temp Queue and associate the 
listener with the Input Channel
+                       // Also resets endpoint status to OK  
                     iC.createListener(anEndpoint.getDelegateKey(), anEndpoint);
                     
iC.removeDelegateFromFailedList(masterEndpoint.getDelegateKey());
                   } catch( Exception exx) { 
                     throw new AsynchAEException(exx);
                   }
+                } else{
+                       throw new 
AsynchAEException("Aggregate:"+getAnalysisEngineController()+" Has not yet 
recovered a listener for delegate: "+anEndpoint.getDelegateKey());
                 }
               } else if ( !masterEndpoint.isFreeCasEndpoint() ) {
                 //  In case this thread blocked while the reply queue listener 
was created, make sure
@@ -1428,8 +1431,23 @@ public class JmsOutputChannel implements
       // Failure on sending a request requires cleanup that includes stopping 
a listener
       // on the delegate that we were unable to send a message to. The 
delegate state is
       // set to FAILED. If there are retries or more CASes to send to this 
delegate the
-      // connection will be retried.
+      // connection will be retried. The error handler called from the send() 
above already
+      // removed the CAS from outstanding list.
       if (isRequest && anEndpoint.getDelegateKey() != null) {
+         Endpoint master= ((AggregateAnalysisEngineController) 
getAnalysisEngineController()).lookUpEndpoint(anEndpoint.getDelegateKey(), 
false);
+         master.setStatus(Endpoint.FAILED);
+         
+         String key = getLookupKey(anEndpoint);
+           if (connectionMap.containsKey(brokerConnectionURL)) {
+                 // First get a Map containing destinations managed by a 
broker provided by the client
+             BrokerConnectionEntry brokerConnectionEntry = 
(BrokerConnectionEntry) connectionMap.get(brokerConnectionURL);
+             // check if the broker connection entry contains an endpoint that 
we failed on while
+             // sending a message. If it exits, remove the endpoint to allow 
recovery to work on 
+             // a subsequent message. 
+             if (brokerConnectionEntry.endpointExists(key)) {
+               brokerConnectionEntry.removeEndpoint(key);
+             }
+           } 
         // Spin recovery thread to handle send error. After the recovery thread
         // is started the current (process) thread goes back to a thread pool 
in
         // ThreadPoolExecutor. The recovery thread can than stop the listener 
and the

Modified: 
uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties?rev=1186933&r1=1186932&r2=1186933&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties
 Thu Oct 20 17:13:38 2011
@@ -211,4 +211,13 @@ UIMAJMS_service_not_responding_to_ping__
 UIMAJMS_starting_listener__INFO=Controller: {0} Starting Listener on Endpoint: 
{1} Selector: {2} Broker: {3}
 UIMAJMS_caught_signal__INFO= Uima AS Service {0} Caught Kill Signal - 
Initiating Quiesce and Stop
 UIMAJMS_listener_added_after_initialize__WARNING = UIMA AS Already Initialized 
- Attempt to Add Callback Listener Failed. Add Callback Listener Before calling 
initialize().
-UIMAJMS_client_interrupted_INFO= UIMA AS Client Thread Interrupted While 
Waiting For a Reply. CAS: {0} CasHashCode: {1}
\ No newline at end of file
+UIMAJMS_client_interrupted_INFO= UIMA AS Client Thread Interrupted While 
Waiting For a Reply. CAS: {0} CasHashCode: {1}
+UIMAJMS_failed_cache_lookup__WARNING= UIMA AS Client Failed Cache Look Up For 
CAS: {0} Command:{1} Message:{2} Destination:{3}
+UIMAJMS_calling_onBeforeMessageSend__INFO= UIMA AS Client Calling 
onBeforeMessageSend - CAS:{0} CasHashCode:{1}
+UIMAJMS_completed_onBeforeMessageSend__INFO= UIMA AS Client Completed 
onBeforeMessageSend - CAS:{0} CasHashCode:{1}
+UIMAJMS_cas_dispatched__INFO= \t>>>>>>> UIMA AS Client Delivered CAS: {0} 
HashCode:{1} to Endpoint:{2}
+UIMAJMS_cas_reply_rcvd_INFO = \t<<<<<<< UIMA AS Client Received Reply For 
CAS:{0} Hashcode:{1}
+UIMAJMS_cas_added_to_pending_INFO = UIMA AS Dispatch Thread Added CAS:{0} 
Hashcode:{1} To Outstanding List. Current List:\n\n{2}\n\n
+UIMAJMS_cas_submitted_FINE=UIMA AS sendAndReceive Received CAS:{0} 
HashCode:{1} For Processing - Forwarding to sendCAS() on Thread:{2}
+UIMAJMS_calling_onBeforeProcessCAS_INFO = UIMA AS Client Calling 
onBeforeMessageProcess For CAS:{0} Hashcode:{1}
+UIMAJMS_completed_onBeforeProcessCAS_INFO = UIMA AS Client Completed 
onBeforeMessageProcess For CAS:{0} Hashcode:{1}


Reply via email to