Author: cwiklik Date: Thu Oct 20 17:13:38 2011 New Revision: 1186933 URL: http://svn.apache.org/viewvc?rev=1186933&view=rev Log: UIMA-2251 Fixed code responsible for recovery of delegate listener when send() fails
Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java?rev=1186933&r1=1186932&r2=1186933&view=diff ============================================================================== --- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java (original) +++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java Thu Oct 20 17:13:38 2011 @@ -316,7 +316,7 @@ public class JmsEndpointConnection_impl open(delegateEndpoint.getEndpoint(), serverUri); } - public synchronized void open(String brokerUri, String anEndpointName) throws AsynchAEException, + public synchronized void open( String anEndpointName, String brokerUri) throws AsynchAEException, ServiceShutdownException { if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "open", @@ -545,16 +545,6 @@ public class JmsEndpointConnection_impl map.put(AsynchAEMessage.CasReference, aMessage.getStringProperty(AsynchAEMessage.CasReference)); map.put(AsynchAEMessage.Endpoint, masterEndpoint); Exception e = new DelegateConnectionLostException("Controller:"+controller.getComponentName()+" Lost Connection to "+target+ ":"+masterEndpoint.getDelegateKey()); - if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { - - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), - "send", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, - "UIMAEE_service_exception_WARNING", controller.getComponentName()); - - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), - "send", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, - "UIMAEE_exception__WARNING", e); - } // Handle error in ProcessErrorHandler ((BaseAnalysisEngineController)controller).handleError(map, e); return true; // return true as if this was successful send @@ -567,9 +557,6 @@ public class JmsEndpointConnection_impl return true; } } - - // Send a reply to a queue provided by the client - // Stop messages and replies are sent to the endpoint provided in the destination object if ((command == AsynchAEMessage.Stop || command == AsynchAEMessage.ReleaseCAS || isReplyEndpoint) && delegateEndpoint.getDestination() != null) { @@ -605,6 +592,7 @@ public class JmsEndpointConnection_impl brokerDestinations.getConnectionTimer().startTimer(connectionCreationTimestamp, delegateEndpoint); } + // Succeeded sending the CAS return true; } catch (Exception e) { Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java?rev=1186933&r1=1186932&r2=1186933&view=diff ============================================================================== --- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java (original) +++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java Thu Oct 20 17:13:38 2011 @@ -501,11 +501,14 @@ public class JmsOutputChannel implements if ( iC != null ) { try { // Create a new Listener, new Temp Queue and associate the listener with the Input Channel + // Also resets endpoint status to OK iC.createListener(anEndpoint.getDelegateKey(), anEndpoint); iC.removeDelegateFromFailedList(masterEndpoint.getDelegateKey()); } catch( Exception exx) { throw new AsynchAEException(exx); } + } else{ + throw new AsynchAEException("Aggregate:"+getAnalysisEngineController()+" Has not yet recovered a listener for delegate: "+anEndpoint.getDelegateKey()); } } else if ( !masterEndpoint.isFreeCasEndpoint() ) { // In case this thread blocked while the reply queue listener was created, make sure @@ -1428,8 +1431,23 @@ public class JmsOutputChannel implements // Failure on sending a request requires cleanup that includes stopping a listener // on the delegate that we were unable to send a message to. The delegate state is // set to FAILED. If there are retries or more CASes to send to this delegate the - // connection will be retried. + // connection will be retried. The error handler called from the send() above already + // removed the CAS from outstanding list. if (isRequest && anEndpoint.getDelegateKey() != null) { + Endpoint master= ((AggregateAnalysisEngineController) getAnalysisEngineController()).lookUpEndpoint(anEndpoint.getDelegateKey(), false); + master.setStatus(Endpoint.FAILED); + + String key = getLookupKey(anEndpoint); + if (connectionMap.containsKey(brokerConnectionURL)) { + // First get a Map containing destinations managed by a broker provided by the client + BrokerConnectionEntry brokerConnectionEntry = (BrokerConnectionEntry) connectionMap.get(brokerConnectionURL); + // check if the broker connection entry contains an endpoint that we failed on while + // sending a message. If it exits, remove the endpoint to allow recovery to work on + // a subsequent message. + if (brokerConnectionEntry.endpointExists(key)) { + brokerConnectionEntry.removeEndpoint(key); + } + } // Spin recovery thread to handle send error. After the recovery thread // is started the current (process) thread goes back to a thread pool in // ThreadPoolExecutor. The recovery thread can than stop the listener and the Modified: uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties?rev=1186933&r1=1186932&r2=1186933&view=diff ============================================================================== --- uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties (original) +++ uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties Thu Oct 20 17:13:38 2011 @@ -211,4 +211,13 @@ UIMAJMS_service_not_responding_to_ping__ UIMAJMS_starting_listener__INFO=Controller: {0} Starting Listener on Endpoint: {1} Selector: {2} Broker: {3} UIMAJMS_caught_signal__INFO= Uima AS Service {0} Caught Kill Signal - Initiating Quiesce and Stop UIMAJMS_listener_added_after_initialize__WARNING = UIMA AS Already Initialized - Attempt to Add Callback Listener Failed. Add Callback Listener Before calling initialize(). -UIMAJMS_client_interrupted_INFO= UIMA AS Client Thread Interrupted While Waiting For a Reply. CAS: {0} CasHashCode: {1} \ No newline at end of file +UIMAJMS_client_interrupted_INFO= UIMA AS Client Thread Interrupted While Waiting For a Reply. CAS: {0} CasHashCode: {1} +UIMAJMS_failed_cache_lookup__WARNING= UIMA AS Client Failed Cache Look Up For CAS: {0} Command:{1} Message:{2} Destination:{3} +UIMAJMS_calling_onBeforeMessageSend__INFO= UIMA AS Client Calling onBeforeMessageSend - CAS:{0} CasHashCode:{1} +UIMAJMS_completed_onBeforeMessageSend__INFO= UIMA AS Client Completed onBeforeMessageSend - CAS:{0} CasHashCode:{1} +UIMAJMS_cas_dispatched__INFO= \t>>>>>>> UIMA AS Client Delivered CAS: {0} HashCode:{1} to Endpoint:{2} +UIMAJMS_cas_reply_rcvd_INFO = \t<<<<<<< UIMA AS Client Received Reply For CAS:{0} Hashcode:{1} +UIMAJMS_cas_added_to_pending_INFO = UIMA AS Dispatch Thread Added CAS:{0} Hashcode:{1} To Outstanding List. Current List:\n\n{2}\n\n +UIMAJMS_cas_submitted_FINE=UIMA AS sendAndReceive Received CAS:{0} HashCode:{1} For Processing - Forwarding to sendCAS() on Thread:{2} +UIMAJMS_calling_onBeforeProcessCAS_INFO = UIMA AS Client Calling onBeforeMessageProcess For CAS:{0} Hashcode:{1} +UIMAJMS_completed_onBeforeProcessCAS_INFO = UIMA AS Client Completed onBeforeMessageProcess For CAS:{0} Hashcode:{1}