Author: cwiklik Date: Wed May 30 19:27:36 2012 New Revision: 1344426 URL: http://svn.apache.org/viewvc?rev=1344426&view=rev Log: UIMA-2414 Refactored code to avoid ConcurrentModificationException while handling ServiceInfo message.
Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java Modified: uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java?rev=1344426&r1=1344425&r2=1344426&view=diff ============================================================================== --- uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java (original) +++ uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java Wed May 30 19:27:36 2012 @@ -1092,15 +1092,18 @@ public class BaseUIMAAsynchronousEngine_ * */ public void stopProducingCases() { - List<DelegateEntry> outstandingCasList = serviceDelegate.getDelegateCasesPendingReply(); - for (DelegateEntry entry : outstandingCasList) { - // The Cas is still being processed - ClientRequest clientCachedRequest = (ClientRequest) clientCache - .get(entry.getCasReferenceId()); - if (clientCachedRequest != null && !clientCachedRequest.isMetaRequest() - && clientCachedRequest.getCasReferenceId() != null) { - stopProducingCases(clientCachedRequest); - } + + String[] casIdsPendingReply = serviceDelegate.getDelegateCasIdsPendingReply(); + if ( casIdsPendingReply != null && casIdsPendingReply.length > 0 ) { + for( String casReferenceId : casIdsPendingReply ) { + // The Cas is still being processed + ClientRequest clientCachedRequest = (ClientRequest) clientCache + .get(casReferenceId); + if (clientCachedRequest != null && !clientCachedRequest.isMetaRequest() + && clientCachedRequest.getCasReferenceId() != null) { + stopProducingCases(casReferenceId, clientCachedRequest.getFreeCasNotificationQueue()); + } + } } } @@ -1114,31 +1117,31 @@ public class BaseUIMAAsynchronousEngine_ // The Cas is still being processed ClientRequest clientCachedRequest = (ClientRequest) clientCache.get(aCasReferenceId); if (clientCachedRequest != null) { - stopProducingCases(clientCachedRequest); + stopProducingCases(aCasReferenceId, clientCachedRequest.getFreeCasNotificationQueue()); } } - private void stopProducingCases(ClientRequest clientCachedRequest) { +// private void stopProducingCases(ClientRequest clientCachedRequest) { + private void stopProducingCases(String casReferenceId, Destination cmFreeCasQueue) { try { - if (clientCachedRequest.getFreeCasNotificationQueue() != null) { +// if (clientCachedRequest.getFreeCasNotificationQueue() != null) { + if (cmFreeCasQueue != null) { TextMessage msg = createTextMessage(); msg.setText(""); msg.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None); - msg - .setStringProperty(AsynchAEMessage.CasReference, clientCachedRequest - .getCasReferenceId()); +// msg.setStringProperty(AsynchAEMessage.CasReference, clientCachedRequest.getCasReferenceId()); + msg.setStringProperty(AsynchAEMessage.CasReference, casReferenceId); msg.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request); msg.setIntProperty(AsynchAEMessage.Command, AsynchAEMessage.Stop); msg.setStringProperty(UIMAMessage.ServerURI, brokerURI); try { - MessageProducer msgProducer = getMessageProducer(clientCachedRequest - .getFreeCasNotificationQueue()); + MessageProducer msgProducer = getMessageProducer(cmFreeCasQueue); if (msgProducer != null) { if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "stopProducingCases", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, - "UIMAJMS_client_sending_stop_to_service__INFO", new Object[] {clientCachedRequest.getCasReferenceId(),clientCachedRequest.getFreeCasNotificationQueue()}); + "UIMAJMS_client_sending_stop_to_service__INFO", new Object[] {casReferenceId,cmFreeCasQueue}); } // Send STOP message to Cas Multiplier Service msgProducer.send(msg); Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java?rev=1344426&r1=1344425&r2=1344426&view=diff ============================================================================== --- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java (original) +++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java Wed May 30 19:27:36 2012 @@ -2150,21 +2150,18 @@ public abstract class BaseAnalysisEngine // Fetch the Delegate object corresponding to the current key Delegate delegate = ((AggregateAnalysisEngineController) this) .lookupDelegate((String) entry.getKey()); - if (delegate != null) { // Get a list of all CASes this aggregate has dispatched to the Cas Multiplier - List<DelegateEntry> pendingList = delegate.getDelegateCasesPendingReply(); - if (pendingList != null) { - Iterator<DelegateEntry> it2 = pendingList.iterator(); + String[] pendingReplyCasIds = delegate.getDelegateCasIdsPendingReply(); + if (pendingReplyCasIds != null && pendingReplyCasIds.length > 0 ) { // For each CAS pending reply send a Stop message to the CM - while (it2.hasNext()) { - DelegateEntry delegateEntry = it2.next(); + for(String casReferenceId : pendingReplyCasIds ) { if (endpoint.isRemote()) { - stopCasMultiplier(delegate, delegateEntry.getCasReferenceId()); + stopCasMultiplier(delegate, casReferenceId); } else { AnalysisEngineController delegateCasMultiplier = lookupDelegateController(endpoint .getEndpoint()); - delegateCasMultiplier.addAbortedCasReferenceId(delegateEntry.getCasReferenceId()); + delegateCasMultiplier.addAbortedCasReferenceId(casReferenceId); } } } Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java?rev=1344426&r1=1344425&r2=1344426&view=diff ============================================================================== --- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java (original) +++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java Wed May 30 19:27:36 2012 @@ -197,8 +197,21 @@ public abstract class Delegate { } } - public List<DelegateEntry> getDelegateCasesPendingReply() { - return outstandingCasList; + /** + * Returns an array of Cas Reference Ids representing all CASes pending + * reply. + * + * @return - String[] - ids of pending CASes + */ + public String[] getDelegateCasIdsPendingReply() { + synchronized (outstandingCasList) { + String[] casIdsPendingReply = new String[outstandingCasList.size()]; + int inx=0; + for (DelegateEntry entry : outstandingCasList) { + casIdsPendingReply[inx++] = entry.getCasReferenceId(); + } + return casIdsPendingReply; + } } public List<DelegateEntry> getDelegateCasesPendingDispatch() { @@ -488,6 +501,22 @@ public abstract class Delegate { } /** + * Returns true if a given CAS is found in the outstandingCasList. Returns false otherwise + * + * @param aCasReferenceId + * - id of a CAS to find in an outstandingCasList + */ + public boolean isCasPendingReply(String aCasReferenceId) { + synchronized (outstandingCasList) { + DelegateEntry entry = lookupEntry(aCasReferenceId, outstandingCasList); + if (entry != null) { + return true; + } + } + return false; + } + + /** * Removes {@link DelegateEntry} from the list of CASes pending reply. The entry is removed when * either: 1) reply is received from the delegate before the timeout 2) the timeout occurs with no * retry 3) an error occurs and the CAS is dropped as part of Error Handling Modified: uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties?rev=1344426&r1=1344425&r2=1344426&view=diff ============================================================================== --- uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties (original) +++ uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties Wed May 30 19:27:36 2012 @@ -55,7 +55,7 @@ UIMAEE_terminating_service__INFO = Servi UIMAEE_primitive_ctrl_init_info__CONFIG - Initializing Async Primitive Service. Number of Worker Threads: {0} UIMAEE_terminating_thread__INFO = Terminatig Worker Thread in Async Primitive Service: {0} UIMAEE_worker_threads_done__INFO = All Worker Threads Terminated in Async Primitive Service: {0} -UIMAEE_killing_process__SEVERE = !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n\t<<<<<<<<<<<<<< Service {0} Killed With System.exit(0) >>>>>>>>>>>>>>>>>>>>>>>>>\n!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! +UIMAEE_killing_process__SEVERE = !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n\t<<<<<<<<<<<<<< Service {0} Killed With System.exit(0). The service's deployment descriptor error handling configured with thresholdAction=terminate on process error >>>>>>>>>>>>>>>>>>>>>>>>>\n!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! UIMAEE_rceived_work__FINE = Worker Thread: {0} Received Work. Cas ReferenceId: {1} UIMAEE_cas_notin_cache__WARNING = Worker Thread: {0} Cas ReferenceId: {1} Not In Cache. UIMAEE_produced_new_cas__FINE = Worker Thread: {0} Controller: {1} Produced New Cas: {2} From Input Cas: {3} Modified: uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java?rev=1344426&r1=1344425&r2=1344426&view=diff ============================================================================== --- uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java (original) +++ uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java Wed May 30 19:27:36 2012 @@ -1263,29 +1263,34 @@ public abstract class BaseUIMAAsynchrono }); } } + casCachedRequest.setHostIpProcessingCAS(message.getStringProperty(AsynchAEMessage.ServerIP)); + if (message.getJMSReplyTo() != null && serviceDelegate.isCasPendingReply(casReferenceId)) { + casCachedRequest.setFreeCasNotificationQueue(message.getJMSReplyTo()); + } } + /* + List<DelegateEntry> outstandingCasList = serviceDelegate.getDelegateCasesPendingReply(); + for (DelegateEntry entry : outstandingCasList) { + if (entry.getCasReferenceId().equals(casReferenceId)) { + // The Cas is still being processed + //ClientRequest casCachedRequest = (ClientRequest) clientCache.get(casReferenceId); + if (casCachedRequest != null) { + casCachedRequest.setFreeCasNotificationQueue(message.getJMSReplyTo()); + } + return; + } + } + */ +// } +// if ( casReferenceId != null && getCache().containsKey(casReferenceId) ) { +// //ClientRequest casCachedRequest = (ClientRequest) clientCache.get(casReferenceId); +// casCachedRequest.setHostIpProcessingCAS(message.getStringProperty(AsynchAEMessage.ServerIP)); +// } } catch( Exception e) { UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), "handleServiceInfo", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e); } - if (message.getJMSReplyTo() != null) { - List<DelegateEntry> outstandingCasList = serviceDelegate.getDelegateCasesPendingReply(); - for (DelegateEntry entry : outstandingCasList) { - if (entry.getCasReferenceId().equals(casReferenceId)) { - // The Cas is still being processed - //ClientRequest casCachedRequest = (ClientRequest) clientCache.get(casReferenceId); - if (casCachedRequest != null) { - casCachedRequest.setFreeCasNotificationQueue(message.getJMSReplyTo()); - } - return; - } - } - } - if ( casReferenceId != null && getCache().containsKey(casReferenceId) ) { - //ClientRequest casCachedRequest = (ClientRequest) clientCache.get(casReferenceId); - casCachedRequest.setHostIpProcessingCAS(message.getStringProperty(AsynchAEMessage.ServerIP)); - } } protected void decrementOutstandingCasCounter() { // Received a reply, decrement number of outstanding CASes