Author: cwiklik
Date: Wed May 30 19:27:36 2012
New Revision: 1344426

URL: http://svn.apache.org/viewvc?rev=1344426&view=rev
Log:
UIMA-2414 Refactored code to avoid ConcurrentModificationException while 
handling ServiceInfo message. 

Modified:
    
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
    
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
    
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java
    
uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties
    
uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java

Modified: 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java?rev=1344426&r1=1344425&r2=1344426&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
 Wed May 30 19:27:36 2012
@@ -1092,15 +1092,18 @@ public class BaseUIMAAsynchronousEngine_
    * 
    */
   public void stopProducingCases() {
-    List<DelegateEntry> outstandingCasList = 
serviceDelegate.getDelegateCasesPendingReply();
-    for (DelegateEntry entry : outstandingCasList) {
-      // The Cas is still being processed
-      ClientRequest clientCachedRequest = (ClientRequest) clientCache
-              .get(entry.getCasReferenceId());
-      if (clientCachedRequest != null && !clientCachedRequest.isMetaRequest()
-              && clientCachedRequest.getCasReferenceId() != null) {
-        stopProducingCases(clientCachedRequest);
-      } 
+    
+    String[] casIdsPendingReply = 
serviceDelegate.getDelegateCasIdsPendingReply();
+    if ( casIdsPendingReply != null && casIdsPendingReply.length > 0 ) {
+      for( String casReferenceId : casIdsPendingReply ) {
+        // The Cas is still being processed
+        ClientRequest clientCachedRequest = (ClientRequest) clientCache
+                .get(casReferenceId);
+        if (clientCachedRequest != null && !clientCachedRequest.isMetaRequest()
+                && clientCachedRequest.getCasReferenceId() != null) {
+          stopProducingCases(casReferenceId, 
clientCachedRequest.getFreeCasNotificationQueue());
+        } 
+      }
     }
   }
 
@@ -1114,31 +1117,31 @@ public class BaseUIMAAsynchronousEngine_
     // The Cas is still being processed
     ClientRequest clientCachedRequest = (ClientRequest) 
clientCache.get(aCasReferenceId);
     if (clientCachedRequest != null) {
-      stopProducingCases(clientCachedRequest);
+      stopProducingCases(aCasReferenceId, 
clientCachedRequest.getFreeCasNotificationQueue());
     }
   }
 
-  private void stopProducingCases(ClientRequest clientCachedRequest) {
+//  private void stopProducingCases(ClientRequest clientCachedRequest) {
+  private void stopProducingCases(String casReferenceId, Destination 
cmFreeCasQueue) {
     try {
-      if (clientCachedRequest.getFreeCasNotificationQueue() != null) {
+//      if (clientCachedRequest.getFreeCasNotificationQueue() != null) {
+      if (cmFreeCasQueue != null) {
         TextMessage msg = createTextMessage();
         msg.setText("");
         msg.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None);
-        msg
-                .setStringProperty(AsynchAEMessage.CasReference, 
clientCachedRequest
-                        .getCasReferenceId());
+//        msg.setStringProperty(AsynchAEMessage.CasReference, 
clientCachedRequest.getCasReferenceId());
+        msg.setStringProperty(AsynchAEMessage.CasReference, casReferenceId);
         msg.setIntProperty(AsynchAEMessage.MessageType, 
AsynchAEMessage.Request);
         msg.setIntProperty(AsynchAEMessage.Command, AsynchAEMessage.Stop);
         msg.setStringProperty(UIMAMessage.ServerURI, brokerURI);
         try {
-          MessageProducer msgProducer = getMessageProducer(clientCachedRequest
-                  .getFreeCasNotificationQueue());
+          MessageProducer msgProducer = getMessageProducer(cmFreeCasQueue);
           if (msgProducer != null) {
                  
               if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
                   UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, 
CLASS_NAME.getName(),
                           "stopProducingCases", 
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                          "UIMAJMS_client_sending_stop_to_service__INFO", new 
Object[] 
{clientCachedRequest.getCasReferenceId(),clientCachedRequest.getFreeCasNotificationQueue()});
+                          "UIMAJMS_client_sending_stop_to_service__INFO", new 
Object[] {casReferenceId,cmFreeCasQueue});
               }
             // Send STOP message to Cas Multiplier Service
             msgProducer.send(msg);

Modified: 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java?rev=1344426&r1=1344425&r2=1344426&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
 Wed May 30 19:27:36 2012
@@ -2150,21 +2150,18 @@ public abstract class BaseAnalysisEngine
           // Fetch the Delegate object corresponding to the current key
           Delegate delegate = ((AggregateAnalysisEngineController) this)
                   .lookupDelegate((String) entry.getKey());
-
           if (delegate != null) {
             // Get a list of all CASes this aggregate has dispatched to the 
Cas Multiplier
-            List<DelegateEntry> pendingList = 
delegate.getDelegateCasesPendingReply();
-            if (pendingList != null) {
-              Iterator<DelegateEntry> it2 = pendingList.iterator();
+            String[] pendingReplyCasIds = 
delegate.getDelegateCasIdsPendingReply();
+            if (pendingReplyCasIds != null && pendingReplyCasIds.length > 0 ) {
               // For each CAS pending reply send a Stop message to the CM
-              while (it2.hasNext()) {
-                DelegateEntry delegateEntry = it2.next();
+              for(String casReferenceId : pendingReplyCasIds ) {
                 if (endpoint.isRemote()) {
-                  stopCasMultiplier(delegate, 
delegateEntry.getCasReferenceId());
+                  stopCasMultiplier(delegate, casReferenceId);
                 } else {
                   AnalysisEngineController delegateCasMultiplier = 
lookupDelegateController(endpoint
                           .getEndpoint());
-                  
delegateCasMultiplier.addAbortedCasReferenceId(delegateEntry.getCasReferenceId());
+                  
delegateCasMultiplier.addAbortedCasReferenceId(casReferenceId);
                 }
               }
             }

Modified: 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java?rev=1344426&r1=1344425&r2=1344426&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java
 Wed May 30 19:27:36 2012
@@ -197,8 +197,21 @@ public abstract class Delegate {
     }
   }
 
-  public List<DelegateEntry> getDelegateCasesPendingReply() {
-    return outstandingCasList;
+  /**
+   * Returns an array of Cas Reference Ids representing all CASes pending
+   * reply. 
+   * 
+   * @return - String[] - ids of pending CASes
+   */
+  public String[] getDelegateCasIdsPendingReply() {
+    synchronized (outstandingCasList) {
+      String[] casIdsPendingReply  = new String[outstandingCasList.size()];
+      int inx=0;
+      for (DelegateEntry entry : outstandingCasList) {
+        casIdsPendingReply[inx++] = entry.getCasReferenceId();
+      }
+      return casIdsPendingReply;
+    }
   }
 
   public List<DelegateEntry> getDelegateCasesPendingDispatch() {
@@ -488,6 +501,22 @@ public abstract class Delegate {
   }
 
   /**
+   * Returns true if a given CAS is found in the outstandingCasList. Returns 
false otherwise
+   * 
+   * @param aCasReferenceId
+   *          - id of a CAS to find in an outstandingCasList
+   */
+  public boolean isCasPendingReply(String aCasReferenceId) {
+    synchronized (outstandingCasList) {
+      DelegateEntry entry = lookupEntry(aCasReferenceId, outstandingCasList);
+      if (entry != null) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
    * Removes {@link DelegateEntry} from the list of CASes pending reply. The 
entry is removed when
    * either: 1) reply is received from the delegate before the timeout 2) the 
timeout occurs with no
    * retry 3) an error occurs and the CAS is dropped as part of Error Handling

Modified: 
uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties?rev=1344426&r1=1344425&r2=1344426&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties 
(original)
+++ 
uima/uima-as/trunk/uimaj-as-core/src/main/resources/uimaee_messages.properties 
Wed May 30 19:27:36 2012
@@ -55,7 +55,7 @@ UIMAEE_terminating_service__INFO = Servi
 UIMAEE_primitive_ctrl_init_info__CONFIG - Initializing Async Primitive 
Service. Number of Worker Threads: {0}
 UIMAEE_terminating_thread__INFO = Terminatig Worker Thread in Async Primitive 
Service: {0}
 UIMAEE_worker_threads_done__INFO = All Worker Threads Terminated in Async 
Primitive Service: {0}
-UIMAEE_killing_process__SEVERE = 
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n\t<<<<<<<<<<<<<< Service 
{0} Killed With System.exit(0) 
>>>>>>>>>>>>>>>>>>>>>>>>>\n!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+UIMAEE_killing_process__SEVERE = 
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n\t<<<<<<<<<<<<<< Service 
{0} Killed With System.exit(0). The service's deployment descriptor error 
handling configured with thresholdAction=terminate on process error  
>>>>>>>>>>>>>>>>>>>>>>>>>\n!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
 UIMAEE_rceived_work__FINE = Worker Thread: {0} Received Work. Cas ReferenceId: 
{1}
 UIMAEE_cas_notin_cache__WARNING = Worker Thread: {0} Cas ReferenceId: {1} Not 
In Cache.
 UIMAEE_produced_new_cas__FINE = Worker Thread: {0} Controller: {1} Produced 
New Cas: {2} From Input Cas: {3}

Modified: 
uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java?rev=1344426&r1=1344425&r2=1344426&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
 Wed May 30 19:27:36 2012
@@ -1263,29 +1263,34 @@ public abstract class BaseUIMAAsynchrono
                               });
                   }
             } 
+             
casCachedRequest.setHostIpProcessingCAS(message.getStringProperty(AsynchAEMessage.ServerIP));
+             if (message.getJMSReplyTo() != null && 
serviceDelegate.isCasPendingReply(casReferenceId)) {
+               
casCachedRequest.setFreeCasNotificationQueue(message.getJMSReplyTo());
+             }
        }
+  /*      
+        List<DelegateEntry> outstandingCasList = 
serviceDelegate.getDelegateCasesPendingReply();
+        for (DelegateEntry entry : outstandingCasList) {
+          if (entry.getCasReferenceId().equals(casReferenceId)) {
+            // The Cas is still being processed
+            //ClientRequest casCachedRequest = (ClientRequest) 
clientCache.get(casReferenceId);
+            if (casCachedRequest != null) {
+              
casCachedRequest.setFreeCasNotificationQueue(message.getJMSReplyTo());
+            }
+            return;
+          }
+        }
+   */
+//      }
+//      if ( casReferenceId != null && getCache().containsKey(casReferenceId) 
) {
+//        //ClientRequest casCachedRequest = (ClientRequest) 
clientCache.get(casReferenceId);
+//        
casCachedRequest.setHostIpProcessingCAS(message.getStringProperty(AsynchAEMessage.ServerIP));
+//      }
     } catch( Exception e) {
       UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, 
getClass().getName(),
               "handleServiceInfo", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
               "UIMAEE_exception__WARNING", e);
     }
-    if (message.getJMSReplyTo() != null) {
-      List<DelegateEntry> outstandingCasList = 
serviceDelegate.getDelegateCasesPendingReply();
-      for (DelegateEntry entry : outstandingCasList) {
-        if (entry.getCasReferenceId().equals(casReferenceId)) {
-          // The Cas is still being processed
-          //ClientRequest casCachedRequest = (ClientRequest) 
clientCache.get(casReferenceId);
-          if (casCachedRequest != null) {
-            
casCachedRequest.setFreeCasNotificationQueue(message.getJMSReplyTo());
-          }
-          return;
-        }
-      }
-    }
-    if ( casReferenceId != null && getCache().containsKey(casReferenceId) ) {
-      //ClientRequest casCachedRequest = (ClientRequest) 
clientCache.get(casReferenceId);
-      
casCachedRequest.setHostIpProcessingCAS(message.getStringProperty(AsynchAEMessage.ServerIP));
-    }
   }
   protected void decrementOutstandingCasCounter() {
     // Received a reply, decrement number of outstanding CASes


Reply via email to