Author: cwiklik Date: Tue Dec 13 16:23:19 2011 New Revision: 1213768 URL: http://svn.apache.org/viewvc?rev=1213768&view=rev Log: UIMA-2180 refactor code handling per cas performance breakdown list
Modified: uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java Modified: uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java?rev=1213768&r1=1213767&r2=1213768&view=diff ============================================================================== --- uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java (original) +++ uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java Tue Dec 13 16:23:19 2011 @@ -210,6 +210,8 @@ public abstract class BaseUIMAAsynchrono private ExecutorService exec = Executors.newFixedThreadPool(1); + private volatile boolean casMultiplierDelegate; + abstract public String getEndPointName() throws Exception; abstract protected TextMessage createTextMessage() throws Exception; @@ -745,7 +747,7 @@ public abstract class BaseUIMAAsynchrono try { getMetaSemaphore.acquire(); } catch (InterruptedException e) { - + e.printStackTrace(); } finally { getMetaSemaphore.release(); } @@ -1073,6 +1075,7 @@ public abstract class BaseUIMAAsynchrono // Adam - store ResouceMetaData in field so we can return it from getMetaData(). resourceMetadata = (ProcessingResourceMetaData) UIMAFramework.getXMLParser() .parseResourceMetaData(in1); + casMultiplierDelegate = resourceMetadata.getOperationalProperties().getOutputsNewCASes(); if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) { UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "handleMetadataReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, @@ -1097,13 +1100,16 @@ public abstract class BaseUIMAAsynchrono if ( aCommand == AsynchAEMessage.Process) { for (int i = 0; listeners != null && i < listeners.size(); i++) { UimaAsBaseCallbackListener statCL = (UimaAsBaseCallbackListener) listeners.get(i); - XStream xstream = new XStream(new DomDriver()); statCL.entityProcessComplete(aCAS, aStatus, - (List<AnalysisEnginePerformanceMetrics>)xstream.fromXML(serializedComponentStats)); + deserializePerformanceMetrics(serializedComponentStats)); } } } - + @SuppressWarnings("unchecked") + private List<AnalysisEnginePerformanceMetrics> deserializePerformanceMetrics(String serializedComponentStats) { + XStream xstream = new XStream(new DomDriver()); + return (List<AnalysisEnginePerformanceMetrics>)xstream.fromXML(serializedComponentStats); + } protected void notifyListeners(CAS aCAS, EntityProcessStatus aStatus, int aCommand) { for (int i = 0; listeners != null && i < listeners.size(); i++) { UimaAsBaseCallbackListener statCL = (UimaAsBaseCallbackListener) listeners.get(i); @@ -1231,20 +1237,7 @@ public abstract class BaseUIMAAsynchrono String.valueOf(casCachedRequest.getCAS().hashCode()) }); } - } else { - UIMAFramework.getLogger(CLASS_NAME).logrb( - Level.INFO, - CLASS_NAME.getName(), - "handleServiceInfo", - JmsConstants.JMS_LOG_RESOURCE_BUNDLE, - "UIMAJMS_skipping_onBeforeProcessCAS_INFO", - new Object[] { - casReferenceId, - String.valueOf(casCachedRequest.getCAS().hashCode()), - nodeIP, pid - }); - - } + } } } catch( Exception e) { UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), @@ -1315,6 +1308,14 @@ public abstract class BaseUIMAAsynchrono handleException(message, cachedRequest, true); return; } + // cachedRequest is only null if we are receiving child CASes from a + // Cas Multiplier. Otherwise, we drop the message as it is out of band + if ( cachedRequest == null && !casMultiplierDelegate ) { + // most likely a reply came in after the thread was interrupted + return; + } + + // If the Cas Reference id not in the message check if the message contains an // exception and if so, handle the exception and return. if (casReferenceId == null) { @@ -1577,6 +1578,14 @@ public abstract class BaseUIMAAsynchrono // Add CAS identifier to enable matching replies with requests notifyListeners(cas, status, AsynchAEMessage.Process); } + } else { // synchronous sendAndReceive() was used + if (casReferenceId != null && message.propertyExists(AsynchAEMessage.CASPerComponentMetrics) ) { + cachedRequest = (ClientRequest) clientCache.get(casReferenceId); + if ( cachedRequest != null && cachedRequest.getComponentMetricsList() != null ) { + cachedRequest.getComponentMetricsList(). + addAll(deserializePerformanceMetrics(message.getStringProperty(AsynchAEMessage.CASPerComponentMetrics))); + } + } } } finally { // Dont release the CAS if the application uses synchronous API @@ -1895,12 +1904,21 @@ public abstract class BaseUIMAAsynchrono public ProcessingResourceMetaData getMetaData() throws ResourceInitializationException { return resourceMetadata; } + public String sendAndReceiveCAS(CAS aCAS) throws ResourceProcessException { + return sendAndReceiveCAS(aCAS, null, null); + } + public String sendAndReceiveCAS(CAS aCAS, ProcessTrace pt) throws ResourceProcessException { + return sendAndReceiveCAS(aCAS, pt, null); + } + public String sendAndReceiveCAS(CAS aCAS, List<AnalysisEnginePerformanceMetrics> componentMetricsList) throws ResourceProcessException { + return sendAndReceiveCAS(aCAS, null, componentMetricsList); + } /** * This is a synchronous method which sends a message to a destination and blocks waiting for a * reply. */ - public String sendAndReceiveCAS(CAS aCAS, ProcessTrace pt) throws ResourceProcessException { + public String sendAndReceiveCAS(CAS aCAS, ProcessTrace pt, List<AnalysisEnginePerformanceMetrics> componentMetricsList) throws ResourceProcessException { if (!running) { throw new ResourceProcessException(new Exception("Uima EE Client Not In Running State")); } @@ -1926,6 +1944,11 @@ public abstract class BaseUIMAAsynchrono ClientRequest cachedRequest = produceNewClientRequestObject(); cachedRequest.setSynchronousInvocation(); + + // save application provided List where the performance stats will be copied + // when reply comes back + cachedRequest.setComponentMetricsList(componentMetricsList); + // This is synchronous call, acquire and hold the semaphore before // dispatching a CAS to a service. The semaphore will be released // iff: @@ -1949,7 +1972,7 @@ public abstract class BaseUIMAAsynchrono // for the second oldest CAS in the outstanding list. serviceDelegate.cancelTimerForCasOrPurge(casReferenceId); throw new ResourceProcessException(e); - } + } } try { // send CAS. This call does not block. Instead we will block the sending thread below. @@ -1962,6 +1985,7 @@ public abstract class BaseUIMAAsynchrono } catch( ResourceProcessException e) { threadMonitor.getMonitor().release(); + removeFromCache(casReferenceId); throw e; } if (threadMonitor != null && threadMonitor.getMonitor() != null) { @@ -2004,12 +2028,18 @@ public abstract class BaseUIMAAsynchrono if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "sendAndReceiveCAS", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, - "UIMAJMS_client_interrupted_INFO", new Object[] { casReferenceId, aCAS.hashCode()}); + "UIMAJMS_client_interrupted_INFO", new Object[] { Thread.currentThread().getId(), casReferenceId, String.valueOf(aCAS.hashCode())}); } // cancel the timer if it is associated with a CAS this thread is waiting for. This would be // the oldest CAS submitted to a queue for processing. The timer will be canceled and restarted // for the second oldest CAS in the outstanding list. serviceDelegate.cancelTimerForCasOrPurge(casReferenceId); + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), + "sendAndReceiveCAS", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAJMS_client_canceled_timer_INFO", new Object[] { Thread.currentThread().getId(), casReferenceId, String.valueOf(aCAS.hashCode())}); + } + removeFromCache(casReferenceId); throw new ResourceProcessException(e); } finally { threadMonitor.getMonitor().release(); @@ -2071,10 +2101,6 @@ public abstract class BaseUIMAAsynchrono } } - public String sendAndReceiveCAS(CAS aCAS) throws ResourceProcessException { - return sendAndReceiveCAS(aCAS, null); - } - protected void notifyOnTimout(CAS aCAS, String anEndpoint, int aTimeoutKind, String casReferenceId) { ProcessTrace pt = new ProcessTrace_impl(); @@ -2272,7 +2298,18 @@ public abstract class BaseUIMAAsynchrono private String hostIpProcessingCAS; - public String getHostIpProcessingCAS() { + List<AnalysisEnginePerformanceMetrics> componentMetricsList; + + public List<AnalysisEnginePerformanceMetrics> getComponentMetricsList() { + return componentMetricsList; + } + + public void setComponentMetricsList( + List<AnalysisEnginePerformanceMetrics> componentMetricsList) { + this.componentMetricsList = componentMetricsList; + } + + public String getHostIpProcessingCAS() { return hostIpProcessingCAS; }