Author: cwiklik
Date: Tue Dec 13 16:23:19 2011
New Revision: 1213768

URL: http://svn.apache.org/viewvc?rev=1213768&view=rev
Log:
UIMA-2180 refactor code handling per cas performance breakdown list  

Modified:
    
uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java

Modified: 
uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java?rev=1213768&r1=1213767&r2=1213768&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
 Tue Dec 13 16:23:19 2011
@@ -210,6 +210,8 @@ public abstract class BaseUIMAAsynchrono
 
   private ExecutorService exec = Executors.newFixedThreadPool(1);
   
+  private volatile boolean casMultiplierDelegate;
+  
   abstract public String getEndPointName() throws Exception;
 
   abstract protected TextMessage createTextMessage() throws Exception;
@@ -745,7 +747,7 @@ public abstract class BaseUIMAAsynchrono
     try {
       getMetaSemaphore.acquire();
     } catch (InterruptedException e) {
-
+       e.printStackTrace();
     } finally {
       getMetaSemaphore.release();
     }
@@ -1073,6 +1075,7 @@ public abstract class BaseUIMAAsynchrono
         // Adam - store ResouceMetaData in field so we can return it from 
getMetaData().
         resourceMetadata = (ProcessingResourceMetaData) 
UIMAFramework.getXMLParser()
                 .parseResourceMetaData(in1);
+        casMultiplierDelegate = 
resourceMetadata.getOperationalProperties().getOutputsNewCASes();
         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
           UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, 
CLASS_NAME.getName(),
                   "handleMetadataReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
@@ -1097,13 +1100,16 @@ public abstract class BaseUIMAAsynchrono
     if ( aCommand == AsynchAEMessage.Process) {
       for (int i = 0; listeners != null && i < listeners.size(); i++) {
         UimaAsBaseCallbackListener statCL = (UimaAsBaseCallbackListener) 
listeners.get(i);
-        XStream xstream = new XStream(new DomDriver());
         statCL.entityProcessComplete(aCAS, aStatus, 
-                
(List<AnalysisEnginePerformanceMetrics>)xstream.fromXML(serializedComponentStats));
+                       
deserializePerformanceMetrics(serializedComponentStats));
       }
     }
   }
-
+  @SuppressWarnings("unchecked")
+  private List<AnalysisEnginePerformanceMetrics> 
deserializePerformanceMetrics(String serializedComponentStats) {
+      XStream xstream = new XStream(new DomDriver());
+      return 
(List<AnalysisEnginePerformanceMetrics>)xstream.fromXML(serializedComponentStats);
+  }
   protected void notifyListeners(CAS aCAS, EntityProcessStatus aStatus, int 
aCommand) {
     for (int i = 0; listeners != null && i < listeners.size(); i++) {
       UimaAsBaseCallbackListener statCL = (UimaAsBaseCallbackListener) 
listeners.get(i);
@@ -1231,20 +1237,7 @@ public abstract class BaseUIMAAsynchrono
                                 
String.valueOf(casCachedRequest.getCAS().hashCode())
                               });
                   }
-            } else {
-                 UIMAFramework.getLogger(CLASS_NAME).logrb(
-                         Level.INFO,
-                         CLASS_NAME.getName(),
-                         "handleServiceInfo",
-                         JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                         "UIMAJMS_skipping_onBeforeProcessCAS_INFO",
-                         new Object[] {
-                                 casReferenceId,
-                                
String.valueOf(casCachedRequest.getCAS().hashCode()),
-                                nodeIP, pid 
-                         });
-                
-            }
+            } 
        }
     } catch( Exception e) {
       UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, 
getClass().getName(),
@@ -1315,6 +1308,14 @@ public abstract class BaseUIMAAsynchrono
       handleException(message, cachedRequest, true);
       return;
     }
+    // cachedRequest is only null if we are receiving child CASes from a 
+    // Cas Multiplier. Otherwise, we drop the message as it is out of band
+    if ( cachedRequest == null && !casMultiplierDelegate ) {
+       // most likely a reply came in after the thread was interrupted
+       return;
+    }
+    
+    
     // If the Cas Reference id not in the message check if the message 
contains an
     // exception and if so, handle the exception and return.
     if (casReferenceId == null) {
@@ -1577,6 +1578,14 @@ public abstract class BaseUIMAAsynchrono
             // Add CAS identifier to enable matching replies with requests
             notifyListeners(cas, status, AsynchAEMessage.Process);
           }
+        } else {  // synchronous sendAndReceive() was used
+            if (casReferenceId != null && 
message.propertyExists(AsynchAEMessage.CASPerComponentMetrics) ) {
+                cachedRequest = (ClientRequest) 
clientCache.get(casReferenceId);
+                if ( cachedRequest != null && 
cachedRequest.getComponentMetricsList() != null ) {
+                       cachedRequest.getComponentMetricsList().
+                               
addAll(deserializePerformanceMetrics(message.getStringProperty(AsynchAEMessage.CASPerComponentMetrics)));
+                }
+            }
         }
       } finally {
         // Dont release the CAS if the application uses synchronous API
@@ -1895,12 +1904,21 @@ public abstract class BaseUIMAAsynchrono
   public ProcessingResourceMetaData getMetaData() throws 
ResourceInitializationException {
     return resourceMetadata;
   }
+  public String sendAndReceiveCAS(CAS aCAS) throws ResourceProcessException {
+    return sendAndReceiveCAS(aCAS, null, null);
+  }
+  public String sendAndReceiveCAS(CAS aCAS, ProcessTrace pt) throws 
ResourceProcessException {
+    return sendAndReceiveCAS(aCAS, pt, null);
+  }
+  public String sendAndReceiveCAS(CAS aCAS, 
List<AnalysisEnginePerformanceMetrics> componentMetricsList) throws 
ResourceProcessException {
+    return sendAndReceiveCAS(aCAS, null, componentMetricsList);
+  }
 
   /**
    * This is a synchronous method which sends a message to a destination and 
blocks waiting for a
    * reply.
    */
-  public String sendAndReceiveCAS(CAS aCAS, ProcessTrace pt) throws 
ResourceProcessException {
+  public String sendAndReceiveCAS(CAS aCAS, ProcessTrace pt, 
List<AnalysisEnginePerformanceMetrics> componentMetricsList) throws 
ResourceProcessException {
     if (!running) {
       throw new ResourceProcessException(new Exception("Uima EE Client Not In 
Running State"));
     }
@@ -1926,6 +1944,11 @@ public abstract class BaseUIMAAsynchrono
 
     ClientRequest cachedRequest = produceNewClientRequestObject();
     cachedRequest.setSynchronousInvocation();
+    
+    // save application provided List where the performance stats will be 
copied
+    //  when reply comes back
+    cachedRequest.setComponentMetricsList(componentMetricsList);
+    
     // This is synchronous call, acquire and hold the semaphore before
     // dispatching a CAS to a service. The semaphore will be released
     // iff:
@@ -1949,7 +1972,7 @@ public abstract class BaseUIMAAsynchrono
        // for the second oldest CAS in the outstanding list.
        serviceDelegate.cancelTimerForCasOrPurge(casReferenceId);
        throw new ResourceProcessException(e);
-      }
+      } 
     }
     try {
       // send CAS. This call does not block. Instead we will block the sending 
thread below.
@@ -1962,6 +1985,7 @@ public abstract class BaseUIMAAsynchrono
 
     } catch( ResourceProcessException e) {
       threadMonitor.getMonitor().release();
+      removeFromCache(casReferenceId);
       throw e;
     }
     if (threadMonitor != null && threadMonitor.getMonitor() != null) {
@@ -2004,12 +2028,18 @@ public abstract class BaseUIMAAsynchrono
                if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) 
{
                 UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, 
CLASS_NAME.getName(),
                         "sendAndReceiveCAS", 
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                        "UIMAJMS_client_interrupted_INFO", new Object[] { 
casReferenceId, aCAS.hashCode()});
+                        "UIMAJMS_client_interrupted_INFO", new Object[] { 
Thread.currentThread().getId(), casReferenceId, 
String.valueOf(aCAS.hashCode())});
             }
                // cancel the timer if it is associated with a CAS this thread 
is waiting for. This would be
                // the oldest CAS submitted to a queue for processing. The 
timer will be canceled and restarted
                // for the second oldest CAS in the outstanding list.
                serviceDelegate.cancelTimerForCasOrPurge(casReferenceId);
+               if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) 
{
+                UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, 
CLASS_NAME.getName(),
+                        "sendAndReceiveCAS", 
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                        "UIMAJMS_client_canceled_timer_INFO", new Object[] { 
Thread.currentThread().getId(), casReferenceId, 
String.valueOf(aCAS.hashCode())});
+            }
+            removeFromCache(casReferenceId);
                throw new ResourceProcessException(e);
         } finally {
           threadMonitor.getMonitor().release();
@@ -2071,10 +2101,6 @@ public abstract class BaseUIMAAsynchrono
     }
   }
 
-  public String sendAndReceiveCAS(CAS aCAS) throws ResourceProcessException {
-    return sendAndReceiveCAS(aCAS, null);
-  }
-
   protected void notifyOnTimout(CAS aCAS, String anEndpoint, int aTimeoutKind, 
String casReferenceId) {
 
     ProcessTrace pt = new ProcessTrace_impl();
@@ -2272,7 +2298,18 @@ public abstract class BaseUIMAAsynchrono
 
     private String hostIpProcessingCAS;
     
-    public String getHostIpProcessingCAS() {
+    List<AnalysisEnginePerformanceMetrics> componentMetricsList;
+    
+    public List<AnalysisEnginePerformanceMetrics> getComponentMetricsList() {
+               return componentMetricsList;
+       }
+
+       public void setComponentMetricsList(
+                       List<AnalysisEnginePerformanceMetrics> 
componentMetricsList) {
+               this.componentMetricsList = componentMetricsList;
+       }
+
+       public String getHostIpProcessingCAS() {
       return hostIpProcessingCAS;
     }
 


Reply via email to