Author: cwiklik
Date: Wed Jul 27 17:14:20 2011
New Revision: 1151538

URL: http://svn.apache.org/viewvc?rev=1151538&view=rev
Log:
UIMA-2188 Release CAS back to its CAS pool if deserialization fails

Modified:
    
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java

Modified: 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java?rev=1151538&r1=1151537&r2=1151538&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java
 Wed Jul 27 17:14:20 2011
@@ -171,124 +171,133 @@ public class ProcessRequestHandler_impl 
     // from the service CAS Pool.
     // 
*************************************************************************
     Endpoint endpoint = aMessageContext.getEndpoint();
-
-    CAS cas = 
getCAS(aMessageContext.propertyExists(AsynchAEMessage.CasSequence), 
shadowCasPoolKey,
-            endpoint.getEndpoint());
-    long timeWaitingForCAS = getController().getCpuTime() - t1;
-    // Check if we are still running
-    if (getController().isStopped()) {
-      // The Controller is in shutdown state.
-      getController().dropCAS(cas);
-      return null;
-    }
-    // 
*************************************************************************
-    // Deserialize CAS from the message
-    // 
*************************************************************************
-    t1 = getController().getCpuTime();
-    String serializationStrategy = endpoint.getSerializer();
-    XmiSerializationSharedData deserSharedData = null;
+    CAS cas = null;
     CacheEntry entry = null;
     
-    UimaSerializer uimaSerializer = 
SerializerCache.lookupSerializerByThreadId();
-    if (serializationStrategy.equals("xmi")) {
-      // Fetch serialized CAS from the message
-      String xmi = aMessageContext.getStringMessage();
-      deserSharedData = new XmiSerializationSharedData();
-      uimaSerializer.deserializeCasFromXmi(xmi, cas, deserSharedData, true, 
-1);
-    } else if (serializationStrategy.equals("binary")) {
+    try {
+      cas = 
getCAS(aMessageContext.propertyExists(AsynchAEMessage.CasSequence), 
shadowCasPoolKey,
+              endpoint.getEndpoint());
+      long timeWaitingForCAS = getController().getCpuTime() - t1;
+      // Check if we are still running
+      if (getController().isStopped()) {
+        // The Controller is in shutdown state.
+        getController().dropCAS(cas);
+        return null;
+      }
+      // 
*************************************************************************
+      // Deserialize CAS from the message
+      // 
*************************************************************************
+      t1 = getController().getCpuTime();
+      String serializationStrategy = endpoint.getSerializer();
+      XmiSerializationSharedData deserSharedData = null;
+      
+      UimaSerializer uimaSerializer = 
SerializerCache.lookupSerializerByThreadId();
+      if (serializationStrategy.equals("xmi")) {
+        // Fetch serialized CAS from the message
+        String xmi = aMessageContext.getStringMessage();
+        deserSharedData = new XmiSerializationSharedData();
+        uimaSerializer.deserializeCasFromXmi(xmi, cas, deserSharedData, true, 
-1);
+      } else if (serializationStrategy.equals("binary")) {
+        // 
*************************************************************************
+        // Register the CAS with a local cache
+        // 
*************************************************************************
+        // CacheEntry entry = 
getController().getInProcessCache().register(cas, aMessageContext,
+        // deserSharedData, casReferenceId);
+        byte[] binarySource = aMessageContext.getByteMessage();
+        uimaSerializer.deserializeCasFromBinary(binarySource, cas);
+      }
+
+      // 
*************************************************************************
+      // Check and set up for Delta CAS reply
+      // 
*************************************************************************
+      boolean acceptsDeltaCas = false;
+      Marker marker = null;
+      if (aMessageContext.propertyExists(AsynchAEMessage.AcceptsDeltaCas)) {
+        acceptsDeltaCas = 
aMessageContext.getMessageBooleanProperty(AsynchAEMessage.AcceptsDeltaCas);
+        if (acceptsDeltaCas) {
+          marker = cas.createMarker();
+        }
+      }
       // 
*************************************************************************
       // Register the CAS with a local cache
       // 
*************************************************************************
       // CacheEntry entry = getController().getInProcessCache().register(cas, 
aMessageContext,
       // deserSharedData, casReferenceId);
-      byte[] binarySource = aMessageContext.getByteMessage();
-      uimaSerializer.deserializeCasFromBinary(binarySource, cas);
-    }
-
-    // 
*************************************************************************
-    // Check and set up for Delta CAS reply
-    // 
*************************************************************************
-    boolean acceptsDeltaCas = false;
-    Marker marker = null;
-    if (aMessageContext.propertyExists(AsynchAEMessage.AcceptsDeltaCas)) {
-      acceptsDeltaCas = 
aMessageContext.getMessageBooleanProperty(AsynchAEMessage.AcceptsDeltaCas);
-      if (acceptsDeltaCas) {
-        marker = cas.createMarker();
-      }
-    }
-    // 
*************************************************************************
-    // Register the CAS with a local cache
-    // 
*************************************************************************
-    // CacheEntry entry = getController().getInProcessCache().register(cas, 
aMessageContext,
-    // deserSharedData, casReferenceId);
-    entry = getController().getInProcessCache().register(cas, aMessageContext, 
deserSharedData,
-            casReferenceId, marker, acceptsDeltaCas);
-
-    long timeToDeserializeCAS = getController().getCpuTime() - t1;
-    getController().incrementDeserializationTime(timeToDeserializeCAS);
-    LongNumericStatistic statistic;
-    if ((statistic = getController().getMonitor().getLongNumericStatistic("",
-            Monitor.TotalDeserializeTime)) != null) {
-      statistic.increment(timeToDeserializeCAS);
-    }
-    if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-      UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, 
CLASS_NAME.getName(),
-              "handleProcessRequestWithXMI", 
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
-              "UIMAEE_deserialize_cas_time_FINE",
-              new Object[] { (double) timeToDeserializeCAS / 1000000.0 });
-    }
+      entry = getController().getInProcessCache().register(cas, 
aMessageContext, deserSharedData,
+              casReferenceId, marker, acceptsDeltaCas);
 
-    // Update Stats
-    ServicePerformance casStats = 
getController().getCasStatistics(casReferenceId);
-    casStats.incrementCasDeserializationTime(timeToDeserializeCAS);
-    if (getController().isTopLevelComponent()) {
-      synchronized (mux) {
-        
getController().getServicePerformance().incrementCasDeserializationTime(
-                timeToDeserializeCAS);
+      long timeToDeserializeCAS = getController().getCpuTime() - t1;
+      getController().incrementDeserializationTime(timeToDeserializeCAS);
+      LongNumericStatistic statistic;
+      if ((statistic = getController().getMonitor().getLongNumericStatistic("",
+              Monitor.TotalDeserializeTime)) != null) {
+        statistic.increment(timeToDeserializeCAS);
       }
-    }
-    getController().saveTime(inTime, casReferenceId, 
getController().getName());
-
-    if (getController() instanceof AggregateAnalysisEngineController) {
-      // If the message came from a Cas Multiplier, associate the input/parent 
CAS id with this CAS
-      if (aMessageContext.propertyExists(AsynchAEMessage.CasSequence)) {
-        // Fetch parent CAS id
-        String inputCasReferenceId = aMessageContext
-                .getMessageStringProperty(AsynchAEMessage.InputCasReference);
-        if (shadowCasPoolKey != null) {
-          // Save the key of the Cas Multiplier in the cache. It will be now 
known which Cas
-          // Multiplier produced this CAS
-          entry.setCasProducerKey(shadowCasPoolKey);
-        }
-        // associate this subordinate CAS with the parent CAS
-        entry.setInputCasReferenceId(inputCasReferenceId);
-        // Save a Cas Multiplier endpoint where a Free CAS notification will 
be sent
-        entry.setFreeCasEndpoint(freeCasEndpoint);
-        cacheStats(inputCasReferenceId, timeWaitingForCAS, 
timeToDeserializeCAS);
+      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, 
CLASS_NAME.getName(),
+                "handleProcessRequestWithXMI", 
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                "UIMAEE_deserialize_cas_time_FINE",
+                new Object[] { (double) timeToDeserializeCAS / 1000000.0 });
+      }
+
+      // Update Stats
+      ServicePerformance casStats = 
getController().getCasStatistics(casReferenceId);
+      casStats.incrementCasDeserializationTime(timeToDeserializeCAS);
+      if (getController().isTopLevelComponent()) {
+        synchronized (mux) {
+          
getController().getServicePerformance().incrementCasDeserializationTime(
+                  timeToDeserializeCAS);
+        }
+      }
+      getController().saveTime(inTime, casReferenceId, 
getController().getName());
+
+      if (getController() instanceof AggregateAnalysisEngineController) {
+        // If the message came from a Cas Multiplier, associate the 
input/parent CAS id with this CAS
+        if (aMessageContext.propertyExists(AsynchAEMessage.CasSequence)) {
+          // Fetch parent CAS id
+          String inputCasReferenceId = aMessageContext
+                  .getMessageStringProperty(AsynchAEMessage.InputCasReference);
+          if (shadowCasPoolKey != null) {
+            // Save the key of the Cas Multiplier in the cache. It will be now 
known which Cas
+            // Multiplier produced this CAS
+            entry.setCasProducerKey(shadowCasPoolKey);
+          }
+          // associate this subordinate CAS with the parent CAS
+          entry.setInputCasReferenceId(inputCasReferenceId);
+          // Save a Cas Multiplier endpoint where a Free CAS notification will 
be sent
+          entry.setFreeCasEndpoint(freeCasEndpoint);
+          cacheStats(inputCasReferenceId, timeWaitingForCAS, 
timeToDeserializeCAS);
+        } else {
+          cacheStats(casReferenceId, timeWaitingForCAS, timeToDeserializeCAS);
+        }
+        DelegateStats stats = new DelegateStats();
+        if (entry.getStat() == null) {
+          entry.setStat(stats);
+          // Add entry for self (this aggregate). 
MessageContext.getEndpointName()
+          // returns the name of the queue receiving the message.
+          stats.put(getController().getServiceEndpointName(), new 
TimerStats());
+        } else {
+          if (!stats.containsKey(getController().getServiceEndpointName())) {
+            stats.put(getController().getServiceEndpointName(), new 
DelegateStats());
+          }
+        }
       } else {
         cacheStats(casReferenceId, timeWaitingForCAS, timeToDeserializeCAS);
       }
-      DelegateStats stats = new DelegateStats();
-      if (entry.getStat() == null) {
-        entry.setStat(stats);
-        // Add entry for self (this aggregate). 
MessageContext.getEndpointName()
-        // returns the name of the queue receiving the message.
-        stats.put(getController().getServiceEndpointName(), new TimerStats());
-      } else {
-        if (!stats.containsKey(getController().getServiceEndpointName())) {
-          stats.put(getController().getServiceEndpointName(), new 
DelegateStats());
-        }
+      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, 
CLASS_NAME.getName(),
+                "handleProcessRequestWithXMI", 
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                "UIMAEE_deserialized_cas_ready_to_process_FINE",
+                new Object[] { aMessageContext.getEndpoint().getEndpoint() });
       }
-    } else {
-      cacheStats(casReferenceId, timeWaitingForCAS, timeToDeserializeCAS);
-    }
-    if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-      UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, 
CLASS_NAME.getName(),
-              "handleProcessRequestWithXMI", 
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
-              "UIMAEE_deserialized_cas_ready_to_process_FINE",
-              new Object[] { aMessageContext.getEndpoint().getEndpoint() });
+      cacheProcessCommandInClientEndpoint();
+       
+    } catch( Exception e) {
+      if ( cas != null ) {
+        cas.release();
+      }
+      throw e;
     }
-    cacheProcessCommandInClientEndpoint();
     return entry;
   }
 


Reply via email to