Author: cwiklik Date: Wed Jul 27 17:14:20 2011 New Revision: 1151538 URL: http://svn.apache.org/viewvc?rev=1151538&view=rev Log: UIMA-2188 Release CAS back to its CAS pool if deserialization fails
Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java?rev=1151538&r1=1151537&r2=1151538&view=diff ============================================================================== --- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java (original) +++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java Wed Jul 27 17:14:20 2011 @@ -171,124 +171,133 @@ public class ProcessRequestHandler_impl // from the service CAS Pool. // ************************************************************************* Endpoint endpoint = aMessageContext.getEndpoint(); - - CAS cas = getCAS(aMessageContext.propertyExists(AsynchAEMessage.CasSequence), shadowCasPoolKey, - endpoint.getEndpoint()); - long timeWaitingForCAS = getController().getCpuTime() - t1; - // Check if we are still running - if (getController().isStopped()) { - // The Controller is in shutdown state. - getController().dropCAS(cas); - return null; - } - // ************************************************************************* - // Deserialize CAS from the message - // ************************************************************************* - t1 = getController().getCpuTime(); - String serializationStrategy = endpoint.getSerializer(); - XmiSerializationSharedData deserSharedData = null; + CAS cas = null; CacheEntry entry = null; - UimaSerializer uimaSerializer = SerializerCache.lookupSerializerByThreadId(); - if (serializationStrategy.equals("xmi")) { - // Fetch serialized CAS from the message - String xmi = aMessageContext.getStringMessage(); - deserSharedData = new XmiSerializationSharedData(); - uimaSerializer.deserializeCasFromXmi(xmi, cas, deserSharedData, true, -1); - } else if (serializationStrategy.equals("binary")) { + try { + cas = getCAS(aMessageContext.propertyExists(AsynchAEMessage.CasSequence), shadowCasPoolKey, + endpoint.getEndpoint()); + long timeWaitingForCAS = getController().getCpuTime() - t1; + // Check if we are still running + if (getController().isStopped()) { + // The Controller is in shutdown state. + getController().dropCAS(cas); + return null; + } + // ************************************************************************* + // Deserialize CAS from the message + // ************************************************************************* + t1 = getController().getCpuTime(); + String serializationStrategy = endpoint.getSerializer(); + XmiSerializationSharedData deserSharedData = null; + + UimaSerializer uimaSerializer = SerializerCache.lookupSerializerByThreadId(); + if (serializationStrategy.equals("xmi")) { + // Fetch serialized CAS from the message + String xmi = aMessageContext.getStringMessage(); + deserSharedData = new XmiSerializationSharedData(); + uimaSerializer.deserializeCasFromXmi(xmi, cas, deserSharedData, true, -1); + } else if (serializationStrategy.equals("binary")) { + // ************************************************************************* + // Register the CAS with a local cache + // ************************************************************************* + // CacheEntry entry = getController().getInProcessCache().register(cas, aMessageContext, + // deserSharedData, casReferenceId); + byte[] binarySource = aMessageContext.getByteMessage(); + uimaSerializer.deserializeCasFromBinary(binarySource, cas); + } + + // ************************************************************************* + // Check and set up for Delta CAS reply + // ************************************************************************* + boolean acceptsDeltaCas = false; + Marker marker = null; + if (aMessageContext.propertyExists(AsynchAEMessage.AcceptsDeltaCas)) { + acceptsDeltaCas = aMessageContext.getMessageBooleanProperty(AsynchAEMessage.AcceptsDeltaCas); + if (acceptsDeltaCas) { + marker = cas.createMarker(); + } + } // ************************************************************************* // Register the CAS with a local cache // ************************************************************************* // CacheEntry entry = getController().getInProcessCache().register(cas, aMessageContext, // deserSharedData, casReferenceId); - byte[] binarySource = aMessageContext.getByteMessage(); - uimaSerializer.deserializeCasFromBinary(binarySource, cas); - } - - // ************************************************************************* - // Check and set up for Delta CAS reply - // ************************************************************************* - boolean acceptsDeltaCas = false; - Marker marker = null; - if (aMessageContext.propertyExists(AsynchAEMessage.AcceptsDeltaCas)) { - acceptsDeltaCas = aMessageContext.getMessageBooleanProperty(AsynchAEMessage.AcceptsDeltaCas); - if (acceptsDeltaCas) { - marker = cas.createMarker(); - } - } - // ************************************************************************* - // Register the CAS with a local cache - // ************************************************************************* - // CacheEntry entry = getController().getInProcessCache().register(cas, aMessageContext, - // deserSharedData, casReferenceId); - entry = getController().getInProcessCache().register(cas, aMessageContext, deserSharedData, - casReferenceId, marker, acceptsDeltaCas); - - long timeToDeserializeCAS = getController().getCpuTime() - t1; - getController().incrementDeserializationTime(timeToDeserializeCAS); - LongNumericStatistic statistic; - if ((statistic = getController().getMonitor().getLongNumericStatistic("", - Monitor.TotalDeserializeTime)) != null) { - statistic.increment(timeToDeserializeCAS); - } - if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), - "handleProcessRequestWithXMI", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, - "UIMAEE_deserialize_cas_time_FINE", - new Object[] { (double) timeToDeserializeCAS / 1000000.0 }); - } + entry = getController().getInProcessCache().register(cas, aMessageContext, deserSharedData, + casReferenceId, marker, acceptsDeltaCas); - // Update Stats - ServicePerformance casStats = getController().getCasStatistics(casReferenceId); - casStats.incrementCasDeserializationTime(timeToDeserializeCAS); - if (getController().isTopLevelComponent()) { - synchronized (mux) { - getController().getServicePerformance().incrementCasDeserializationTime( - timeToDeserializeCAS); + long timeToDeserializeCAS = getController().getCpuTime() - t1; + getController().incrementDeserializationTime(timeToDeserializeCAS); + LongNumericStatistic statistic; + if ((statistic = getController().getMonitor().getLongNumericStatistic("", + Monitor.TotalDeserializeTime)) != null) { + statistic.increment(timeToDeserializeCAS); } - } - getController().saveTime(inTime, casReferenceId, getController().getName()); - - if (getController() instanceof AggregateAnalysisEngineController) { - // If the message came from a Cas Multiplier, associate the input/parent CAS id with this CAS - if (aMessageContext.propertyExists(AsynchAEMessage.CasSequence)) { - // Fetch parent CAS id - String inputCasReferenceId = aMessageContext - .getMessageStringProperty(AsynchAEMessage.InputCasReference); - if (shadowCasPoolKey != null) { - // Save the key of the Cas Multiplier in the cache. It will be now known which Cas - // Multiplier produced this CAS - entry.setCasProducerKey(shadowCasPoolKey); - } - // associate this subordinate CAS with the parent CAS - entry.setInputCasReferenceId(inputCasReferenceId); - // Save a Cas Multiplier endpoint where a Free CAS notification will be sent - entry.setFreeCasEndpoint(freeCasEndpoint); - cacheStats(inputCasReferenceId, timeWaitingForCAS, timeToDeserializeCAS); + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), + "handleProcessRequestWithXMI", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAEE_deserialize_cas_time_FINE", + new Object[] { (double) timeToDeserializeCAS / 1000000.0 }); + } + + // Update Stats + ServicePerformance casStats = getController().getCasStatistics(casReferenceId); + casStats.incrementCasDeserializationTime(timeToDeserializeCAS); + if (getController().isTopLevelComponent()) { + synchronized (mux) { + getController().getServicePerformance().incrementCasDeserializationTime( + timeToDeserializeCAS); + } + } + getController().saveTime(inTime, casReferenceId, getController().getName()); + + if (getController() instanceof AggregateAnalysisEngineController) { + // If the message came from a Cas Multiplier, associate the input/parent CAS id with this CAS + if (aMessageContext.propertyExists(AsynchAEMessage.CasSequence)) { + // Fetch parent CAS id + String inputCasReferenceId = aMessageContext + .getMessageStringProperty(AsynchAEMessage.InputCasReference); + if (shadowCasPoolKey != null) { + // Save the key of the Cas Multiplier in the cache. It will be now known which Cas + // Multiplier produced this CAS + entry.setCasProducerKey(shadowCasPoolKey); + } + // associate this subordinate CAS with the parent CAS + entry.setInputCasReferenceId(inputCasReferenceId); + // Save a Cas Multiplier endpoint where a Free CAS notification will be sent + entry.setFreeCasEndpoint(freeCasEndpoint); + cacheStats(inputCasReferenceId, timeWaitingForCAS, timeToDeserializeCAS); + } else { + cacheStats(casReferenceId, timeWaitingForCAS, timeToDeserializeCAS); + } + DelegateStats stats = new DelegateStats(); + if (entry.getStat() == null) { + entry.setStat(stats); + // Add entry for self (this aggregate). MessageContext.getEndpointName() + // returns the name of the queue receiving the message. + stats.put(getController().getServiceEndpointName(), new TimerStats()); + } else { + if (!stats.containsKey(getController().getServiceEndpointName())) { + stats.put(getController().getServiceEndpointName(), new DelegateStats()); + } + } } else { cacheStats(casReferenceId, timeWaitingForCAS, timeToDeserializeCAS); } - DelegateStats stats = new DelegateStats(); - if (entry.getStat() == null) { - entry.setStat(stats); - // Add entry for self (this aggregate). MessageContext.getEndpointName() - // returns the name of the queue receiving the message. - stats.put(getController().getServiceEndpointName(), new TimerStats()); - } else { - if (!stats.containsKey(getController().getServiceEndpointName())) { - stats.put(getController().getServiceEndpointName(), new DelegateStats()); - } + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), + "handleProcessRequestWithXMI", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAEE_deserialized_cas_ready_to_process_FINE", + new Object[] { aMessageContext.getEndpoint().getEndpoint() }); } - } else { - cacheStats(casReferenceId, timeWaitingForCAS, timeToDeserializeCAS); - } - if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), - "handleProcessRequestWithXMI", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, - "UIMAEE_deserialized_cas_ready_to_process_FINE", - new Object[] { aMessageContext.getEndpoint().getEndpoint() }); + cacheProcessCommandInClientEndpoint(); + + } catch( Exception e) { + if ( cas != null ) { + cas.release(); + } + throw e; } - cacheProcessCommandInClientEndpoint(); return entry; }