Author: cwiklik Date: Thu Sep 22 19:15:05 2011 New Revision: 1174327 URL: http://svn.apache.org/viewvc?rev=1174327&view=rev Log: UIMA-2232 fixes problems associated with handling timeouts
Modified: uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/ClientServiceDelegate.java Modified: uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java?rev=1174327&r1=1174326&r2=1174327&view=diff ============================================================================== --- uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java (original) +++ uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java Thu Sep 22 19:15:05 2011 @@ -90,7 +90,57 @@ public class TestUimaASExtended extends + System.getProperty("file.separator") + "bin" + System.getProperty("file.separator") + "dd2spring.xsl"); } - + /** + * Tests service quiesce and stop support. This test sets a CasPool to 1 to send just one CAS at a + * time. After the first CAS is sent, a thread is started with a timer to expire before the reply + * is received. When the timer expires, the client initiates quiesceAndStop on the top level + * controller. As part of this, the top level controller stops its listeners on the input queue + * (GetMeta and Process Listeners), and registers a callback with the InProcess cache. When the + * cache is empty, meaning all CASes are processed, the cache notifies the controller which then + * begins the service shutdown. Meanwhile, the client receives a reply for the first CAS, and + * sends a second CAS. This CAS, will remain in the queue as the service has previously stopped + * its listeners. The client times out after 10 seconds and shuts down. + * + * @throws Exception + */ + public void testQuiesceAndStop() throws Exception { + System.out.println("-------------- testQuiesceAndStop -------------"); + BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl(); + Map<String, Object> appCtx = buildContext(String.valueOf(broker.getMasterConnectorURI()), + "TopLevelTaeQueue"); + // Set an explicit process timeout so to test the ping on timeout + appCtx.put(UimaAsynchronousEngine.Timeout, 10000); + appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 300); + appCtx.put(UimaAsynchronousEngine.CasPoolSize, 1); + String containers[] = new String[2]; + containers[0] = deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml"); + containers[1] = deployService(eeUimaEngine, relativePath + + "/Deploy_AggregateAnnotatorWithInternalCM1000Docs.xml"); + spinShutdownThread(eeUimaEngine, 5000, containers, SpringContainerDeployer.QUIESCE_AND_STOP); + runTest(appCtx, eeUimaEngine, String.valueOf(broker.getMasterConnectorURI()), + "TopLevelTaeQueue", 3, EXCEPTION_LATCH); + } + + public void testStopNow() throws Exception { + System.out.println("-------------- testAggregateWithFailedRemoteDelegate -------------"); + BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl(); + String containers[] = new String[2]; + + containers[0] = deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml"); + containers[1] = deployService(eeUimaEngine, relativePath + + "/Deploy_AggregateAnnotatorWithInternalCM1000Docs.xml"); + Map<String, Object> appCtx = buildContext(String.valueOf(broker.getMasterConnectorURI()), + "TopLevelTaeQueue"); + // Set an explicit process timeout so to test the ping on timeout + appCtx.put(UimaAsynchronousEngine.Timeout, 4000); + appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 300); + spinShutdownThread(eeUimaEngine, 3000, containers, SpringContainerDeployer.STOP_NOW); + // send may fail since we forcefully stop the service. Tolerate + // ResourceProcessException + addExceptionToignore(ResourceProcessException.class); + runTest(appCtx, eeUimaEngine, String.valueOf(broker.getMasterConnectorURI()), + "TopLevelTaeQueue", 10, EXCEPTION_LATCH); + } public void testAggregateHttpTunnelling() throws Exception { System.out.println("-------------- testAggregateHttpTunnelling -------------"); // Create Uima EE Client @@ -1228,57 +1278,7 @@ public class TestUimaASExtended extends 1, EXCEPTION_LATCH); } - /** - * Tests service quiesce and stop support. This test sets a CasPool to 1 to send just one CAS at a - * time. After the first CAS is sent, a thread is started with a timer to expire before the reply - * is received. When the timer expires, the client initiates quiesceAndStop on the top level - * controller. As part of this, the top level controller stops its listeners on the input queue - * (GetMeta and Process Listeners), and registers a callback with the InProcess cache. When the - * cache is empty, meaning all CASes are processed, the cache notifies the controller which then - * begins the service shutdown. Meanwhile, the client receives a reply for the first CAS, and - * sends a second CAS. This CAS, will remain in the queue as the service has previously stopped - * its listeners. The client times out after 10 seconds and shuts down. - * - * @throws Exception - */ - public void testQuiesceAndStop() throws Exception { - System.out.println("-------------- testQuiesceAndStop -------------"); - BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl(); - Map<String, Object> appCtx = buildContext(String.valueOf(broker.getMasterConnectorURI()), - "TopLevelTaeQueue"); - // Set an explicit process timeout so to test the ping on timeout - appCtx.put(UimaAsynchronousEngine.Timeout, 10000); - appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 300); - appCtx.put(UimaAsynchronousEngine.CasPoolSize, 1); - String containers[] = new String[2]; - containers[0] = deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml"); - containers[1] = deployService(eeUimaEngine, relativePath - + "/Deploy_AggregateAnnotatorWithInternalCM1000Docs.xml"); - spinShutdownThread(eeUimaEngine, 2000, containers, SpringContainerDeployer.QUIESCE_AND_STOP); - runTest(appCtx, eeUimaEngine, String.valueOf(broker.getMasterConnectorURI()), - "TopLevelTaeQueue", 3, EXCEPTION_LATCH); - } - - public void testStopNow() throws Exception { - System.out.println("-------------- testAggregateWithFailedRemoteDelegate -------------"); - BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl(); - String containers[] = new String[2]; - containers[0] = deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml"); - containers[1] = deployService(eeUimaEngine, relativePath - + "/Deploy_AggregateAnnotatorWithInternalCM1000Docs.xml"); - Map<String, Object> appCtx = buildContext(String.valueOf(broker.getMasterConnectorURI()), - "TopLevelTaeQueue"); - // Set an explicit process timeout so to test the ping on timeout - appCtx.put(UimaAsynchronousEngine.Timeout, 4000); - appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 300); - spinShutdownThread(eeUimaEngine, 3000, containers, SpringContainerDeployer.STOP_NOW); - // send may fail since we forcefully stop the service. Tolerate - // ResourceProcessException - addExceptionToignore(ResourceProcessException.class); - runTest(appCtx, eeUimaEngine, String.valueOf(broker.getMasterConnectorURI()), - "TopLevelTaeQueue", 10, EXCEPTION_LATCH); - } public void testCMAggregateClientStopRequest() throws Exception { System.out.println("-------------- testCMAggregateClientStopRequest -------------"); final BaseUIMAAsynchronousEngine_impl eeUimaEngine = new BaseUIMAAsynchronousEngine_impl(); Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java?rev=1174327&r1=1174326&r2=1174327&view=diff ============================================================================== --- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java (original) +++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java Thu Sep 22 19:15:05 2011 @@ -56,7 +56,7 @@ public abstract class Delegate { private Endpoint endpoint; // Timer object to time replies - private Timer timer; + private DelegateTimer timer; // Process Timeout value for this delegate private long casProcessTimeout = 0; @@ -136,6 +136,19 @@ public abstract class Delegate { return endpoint; } + public void cancelTimerForCasOrPurge(String casReferenceId) { + if ( timer != null && timer.getTimerCasId() != null && timer.getTimerCasId().equals(casReferenceId)) { + //System.out.println("\n\n\t Canceled Timer For CAS:"+casReferenceId+" and Restarting Timer for the next oldest CAS in the outstanding list\n\n"); + cancelDelegateTimer(); + // Restart timer for the next older CAS in the oustanding list + restartTimerForOldestCasInOutstandingList(); + } else { + // Given CAS is not the oldest in outstanding list. Purge the CAS from both outstanding and + // pending dispatch lists (if exists). + removeCasFromOutstandingList(casReferenceId); + removeCasFromPendingDispatchList(casReferenceId); + } + } /** * Forces Timer restart for the oldest CAS sitting in the list of CASes pending reply. */ @@ -251,6 +264,7 @@ public abstract class Delegate { new Object[] { getComponentName(), delegateKey, aCasReferenceId, outstandingCasList.size() }); } + //System.out.println("\n\n\t++++++++++++++++++++++++ :::::: Added New CAS to Outstanding List:"+entry.getCasReferenceId()+"\n\tOutstanding:"+toString()); } } } @@ -294,7 +308,7 @@ public abstract class Delegate { * Logs CASes sitting in the list of CASes pending dispatch. These CASes were delayed due to a bad * state of the delegate. */ - private void dumpDelayedList() { + protected void dumpDelayedList() { if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { for (DelegateEntry entry : pendingDispatchList) { UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, this.getClass().getName(), @@ -571,6 +585,12 @@ public abstract class Delegate { public void startGetMetaRequestTimer() { startDelegateTimer(null, AsynchAEMessage.GetMeta); } + /** + * Starts GetMeta Request timer + */ + public void startGetMetaRequestTimer(String casReferenceId) { + startDelegateTimer(casReferenceId, AsynchAEMessage.GetMeta); + } /** * Starts a timer for a given command @@ -581,10 +601,11 @@ public abstract class Delegate { * - command for which the timer is started */ private synchronized void startDelegateTimer(final String aCasReferenceId, final int aCommand) { - final long timeToWait = getTimeoutValueForCommand(aCommand); + + final long timeToWait = getTimeoutValueForCommand(aCommand); Date timeToRun = new Date(System.currentTimeMillis() + timeToWait); - timer = new Timer("Controller:" + getComponentName() + ":Request TimerThread-Endpoint_impl:" - + endpoint + ":" + System.nanoTime() + ":Cmd:" + aCommand); + timer = new DelegateTimer("Controller:" + getComponentName() + ":Request TimerThread-Endpoint_impl:" + + endpoint + ":" + System.nanoTime() + ":Cmd:" + aCommand, true, aCasReferenceId,this); final Delegate delegate = this; timer.schedule(new TimerTask() { public void run() { @@ -593,7 +614,6 @@ public abstract class Delegate { errorContext.add(AsynchAEMessage.Command, aCommand); String enrichedMessage = enrichProcessCASTimeoutMessage(aCommand, aCasReferenceId,timeToWait,"Delegate Service:"+delegateKey+" Has Timed Out While Processing CAS:"+aCasReferenceId ); Exception cause = new MessageTimeoutException(enrichedMessage); -// Exception cause = new MessageTimeoutException("Delegate Service:"+delegateKey+" Has Timed Out While Processing CAS:"+aCasReferenceId); if (AsynchAEMessage.Process == aCommand) { if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(), @@ -614,6 +634,10 @@ public abstract class Delegate { errorContext.add(AsynchAEMessage.ErrorCause, AsynchAEMessage.PingTimeout); } } else if (AsynchAEMessage.GetMeta == aCommand) { + if ( aCasReferenceId != null ) { // true on GetMeta Ping timeout + errorContext.add(AsynchAEMessage.CasReference, aCasReferenceId); + errorContext.add(AsynchAEMessage.ErrorCause, AsynchAEMessage.PingTimeout); + } if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(), "Delegate.TimerTask.run", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, @@ -758,6 +782,19 @@ public abstract class Delegate { return casReferenceId; } } + protected String getDelayedCASes() { + StringBuffer sb = new StringBuffer(); + List<DelegateEntry> copyOfOutstandingCASes = + new ArrayList<DelegateEntry>(outstandingCasList); + for (DelegateEntry entry : copyOfOutstandingCASes) { + if ( entry != null && entry.getCasReferenceId() != null ) { + sb.append("["+entry.getCasReferenceId()+"]"); + } + } + return sb.toString(); + + } + public String toString(){ StringBuffer sb = new StringBuffer(); List<DelegateEntry> copyOfOutstandingCASes = @@ -769,4 +806,21 @@ public abstract class Delegate { } return sb.toString(); } + private class DelegateTimer extends Timer { + String casReferenceId; + Delegate delegate; + public DelegateTimer(String threadName, boolean isDaemon, String casReferenceId, Delegate delegate) { + super(threadName, isDaemon); + this.casReferenceId = casReferenceId; + this.delegate = delegate; + } + public void cancel() { + super.cancel(); + //System.out.println("\n\n---------------------------------- Cancelled Timer on CAS:"+casReferenceId+" .\n\tOutstanding:"+delegate.toString()+"\n\n"); + //Thread.dumpStack(); + } + public String getTimerCasId() { + return casReferenceId; + } + } } Modified: uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java?rev=1174327&r1=1174326&r2=1174327&view=diff ============================================================================== --- uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java (original) +++ uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java Thu Sep 22 19:15:05 2011 @@ -834,15 +834,27 @@ public abstract class BaseUIMAAsynchrono if ( !serviceDelegate.isAwaitingPingReply() && sharedConnection.isOpen() ) { serviceDelegate.setAwaitingPingReply(); // Add the cas to a list of CASes pending reply. Also start the timer if necessary - serviceDelegate.addCasToOutstandingList(requestToCache.getCasReferenceId()); + // serviceDelegate.addCasToOutstandingList(requestToCache.getCasReferenceId()); + + // since the service is in time out state, we dont send CASes to it just yet. Instead, place + // a CAS in a pending dispatch list. CASes from this list will be sent once a response to PING + // arrives. + serviceDelegate.addCasToPendingDispatchList(requestToCache.getCasReferenceId()); if ( cpcReadySemaphore.availablePermits() > 0 ) { acquireCpcReadySemaphore(); } // Send PING Request to check delegate's availability sendMetaRequest(); - if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "sendCAS", + serviceDelegate.cancelDelegateTimer(); + // Start a timer for GetMeta ping and associate a cas id + // with this timer. The delegate is currently in a timed out + // state due to a timeout on a CAS with a given casReferenceId. + // + serviceDelegate.startGetMetaRequestTimer(casReferenceId); + + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "sendCAS", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_sending_ping__FINE", new Object[] { serviceDelegate.getKey() }); } @@ -854,7 +866,11 @@ public abstract class BaseUIMAAsynchrono return casReferenceId; } else { // Add to the outstanding list. - serviceDelegate.addCasToOutstandingList(requestToCache.getCasReferenceId()); + // serviceDelegate.addCasToOutstandingList(requestToCache.getCasReferenceId()); + // since the service is in time out state, we dont send CASes to it just yet. Instead, place + // a CAS in a pending dispatch list. CASes from this list will be sent once a response to PING + // arrives. + serviceDelegate.addCasToPendingDispatchList(requestToCache.getCasReferenceId()); return casReferenceId; } } @@ -973,12 +989,16 @@ public abstract class BaseUIMAAsynchrono // reset the state of the service. The client received its ping reply serviceDelegate.resetAwaitingPingReply(); String casReferenceId = null; - if (serviceDelegate.getCasPendingReplyListSize() > 0) { - casReferenceId = serviceDelegate.removeOldestCasFromOutstandingList(); - ClientRequest cachedRequest = (ClientRequest) clientCache.get(casReferenceId); - if (cachedRequest != null) { - sendCAS(cachedRequest.getCAS(), cachedRequest); - } + if (serviceDelegate.getCasPendingReplyListSize() > 0 || serviceDelegate.getCasPendingDispatchListSize() > 0) { + serviceDelegate.restartTimerForOldestCasInOutstandingList(); + // We got a reply to GetMeta ping. Send all CASes that have been + // placed on the pending dispatch queue to a service. + while( (casReferenceId = serviceDelegate.removeOldestFromPendingDispatchList()) != null ) { + ClientRequest cachedRequest = (ClientRequest) clientCache.get(casReferenceId); + if (cachedRequest != null) { + sendCAS(cachedRequest.getCAS(), cachedRequest); + } + } } else { ProcessTrace pt = new ProcessTrace_impl(); UimaASProcessStatusImpl status = new UimaASProcessStatusImpl(pt); @@ -2038,60 +2058,79 @@ public abstract class BaseUIMAAsynchrono break; case (ProcessTimeout): - ClientRequest cachedRequest = (ClientRequest) clientCache.get(casReferenceId); - if (cachedRequest != null) { - if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), - "notifyOnTimout", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, - "UIMAJMS_process_timeout_WARNING", new Object[] { anEndpoint, cachedRequest.getHostIpProcessingCAS() }); - } - } else { - if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { - // if missing for any reason ... - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), - "handleProcessReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, - "UIMAJMS_received_expired_msg_INFO", - new Object[] { anEndpoint, casReferenceId }); - } - return; - } - // Store the total latency for this CAS. The departure time is set right before the CAS - // is sent to a service. - cachedRequest.setTimeWaitingForReply(System.nanoTime() - - cachedRequest.getCASDepartureTime()); - - // mark timeout exception - cachedRequest.setTimeoutException(); - - if (cachedRequest.isSynchronousInvocation()) { - // Signal a thread that we received a reply, if in the map - if (threadMonitorMap.containsKey(cachedRequest.getThreadId())) { - ThreadMonitor threadMonitor = (ThreadMonitor) threadMonitorMap.get(cachedRequest - .getThreadId()); - // Unblock the sending thread so that it can complete processing with an error - if (threadMonitor != null) { - threadMonitor.getMonitor().release(); - cachedRequest.setReceivedProcessCasReply(); // should not be needed - } - } - } else { - // notify the application listener with the error - if ( serviceDelegate.isPingTimeout()) { - exc = new UimaASProcessCasTimeout(new UimaASPingTimeout("UIMA AS Client Ping Time While Waiting For Reply From a Service On Queue:"+anEndpoint)); - serviceDelegate.resetPingTimeout(); - } else { - exc = new UimaASProcessCasTimeout("UIMA AS Client Timed Out Waiting For CAS:"+casReferenceId+ " Reply From a Service On Queue:"+anEndpoint); - } - status.addEventStatus("Process", "Failed", exc); - notifyListeners(aCAS, status, AsynchAEMessage.Process); - } - cachedRequest.removeEntry(casReferenceId); - serviceDelegate.removeCasFromOutstandingList(casReferenceId); - // Check if all replies have been received - long outstandingCasCount = outstandingCasRequests.decrementAndGet(); - if (outstandingCasCount == 0) { - cpcReadySemaphore.release(); - } + if ( casReferenceId != null ) { + ClientRequest cachedRequest = (ClientRequest) clientCache.get(casReferenceId); + if (cachedRequest != null) { + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), + "notifyOnTimout", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAJMS_process_timeout_WARNING", new Object[] { anEndpoint, cachedRequest.getHostIpProcessingCAS() }); + } + } else { + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { + // if missing for any reason ... + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), + "notifyOnTimout", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAJMS_received_expired_msg_INFO", + new Object[] { anEndpoint, casReferenceId }); + } + return; + } + // Store the total latency for this CAS. The departure time is set right before the CAS + // is sent to a service. + cachedRequest.setTimeWaitingForReply(System.nanoTime() + - cachedRequest.getCASDepartureTime()); + + // mark timeout exception + cachedRequest.setTimeoutException(); + + if (cachedRequest.isSynchronousInvocation()) { + // Signal a thread that we received a reply, if in the map + if (threadMonitorMap.containsKey(cachedRequest.getThreadId())) { + ThreadMonitor threadMonitor = (ThreadMonitor) threadMonitorMap.get(cachedRequest + .getThreadId()); + // Unblock the sending thread so that it can complete processing with an error + if (threadMonitor != null) { + threadMonitor.getMonitor().release(); + cachedRequest.setReceivedProcessCasReply(); // should not be needed + } + } + } else { + // notify the application listener with the error + if ( serviceDelegate.isPingTimeout()) { + exc = new UimaASProcessCasTimeout(new UimaASPingTimeout("UIMA AS Client Ping Time While Waiting For Reply From a Service On Queue:"+anEndpoint)); + serviceDelegate.resetPingTimeout(); + } else { + exc = new UimaASProcessCasTimeout("UIMA AS Client Timed Out Waiting For CAS:"+casReferenceId+ " Reply From a Service On Queue:"+anEndpoint); + } + status.addEventStatus("Process", "Failed", exc); + notifyListeners(aCAS, status, AsynchAEMessage.Process); + } + boolean isSynchronousCall = cachedRequest.isSynchronousInvocation(); + + cachedRequest.removeEntry(casReferenceId); + serviceDelegate.removeCasFromOutstandingList(casReferenceId); + // Check if all replies have been received + long outstandingCasCount = outstandingCasRequests.decrementAndGet(); + if (outstandingCasCount == 0) { + cpcReadySemaphore.release(); + } + // + if ( !isSynchronousCall && serviceDelegate.getCasPendingReplyListSize() > 0) { + String nextOutstandingCasReferenceId = + serviceDelegate.getOldestCasIdFromOutstandingList(); + if ( nextOutstandingCasReferenceId != null ) { + cachedRequest = (ClientRequest) clientCache.get(nextOutstandingCasReferenceId); + try { + sendCAS(cachedRequest.getCAS()); + } catch( Exception e) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), + "notifyOnTimout", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAEE_exception__WARNING", e); + } + } + } + } break; } // case } Modified: uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/ClientServiceDelegate.java URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/ClientServiceDelegate.java?rev=1174327&r1=1174326&r2=1174327&view=diff ============================================================================== --- uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/ClientServiceDelegate.java (original) +++ uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/ClientServiceDelegate.java Thu Sep 22 19:15:05 2011 @@ -89,7 +89,8 @@ public class ClientServiceDelegate exten String casReferenceId = null; CAS cas = null; ClientRequest cachedRequest = null; - + casReferenceId = (String) errorContext.get(AsynchAEMessage.CasReference); + synchronized(errorMux) { if (!clientUimaAsEngine.running) { cancelDelegateTimer(); @@ -101,7 +102,7 @@ public class ClientServiceDelegate exten if (e instanceof MessageTimeoutException) { switch (command) { case AsynchAEMessage.Process: - casReferenceId = (String) errorContext.get(AsynchAEMessage.CasReference); + //casReferenceId = (String) errorContext.get(AsynchAEMessage.CasReference); if (casReferenceId != null) { cachedRequest = (ClientRequest) clientUimaAsEngine.clientCache.get(casReferenceId); if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING) @@ -145,25 +146,27 @@ public class ClientServiceDelegate exten case AsynchAEMessage.GetMeta: if (isAwaitingPingReply()) { + + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), + "handleError", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAJMS_client_ping_timed_out__WARNING", new Object[] { clientUimaAsEngine.getEndPointName()}); + } + super.resetAwaitingPingReply(); + // Handling a Ping timeout, treat it as if it was a Process timeout. clientUimaAsEngine.notifyOnTimout(cas, clientUimaAsEngine.getEndPointName(), - BaseUIMAAsynchronousEngineCommon_impl.PingTimeout, casReferenceId); + BaseUIMAAsynchronousEngineCommon_impl.ProcessTimeout, casReferenceId); } else { + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), + "handleError", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAJMS_meta_timeout_WARNING", new Object[] { getKey() }); + } // Notifies Listeners and removes ClientRequest instance from the client cache clientUimaAsEngine.notifyOnTimout(cas, clientUimaAsEngine.getEndPointName(), BaseUIMAAsynchronousEngineCommon_impl.MetadataTimeout, casReferenceId); clientUimaAsEngine.clientSideJmxStats.incrementMetaTimeoutErrorCount(); } - if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), - "handleError", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, - "UIMAJMS_meta_timeout_WARNING", new Object[] { getKey() }); - } - if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), - "handleError", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, - "UIMAJMS_service_not_responding_to_ping__WARNING", new Object[] { clientUimaAsEngine.getEndPointName()}); - } - clientUimaAsEngine.stop(); break; case AsynchAEMessage.CollectionProcessComplete: @@ -188,7 +191,7 @@ public class ClientServiceDelegate exten public String enrichProcessCASTimeoutMessage(int aCommand, String casReferenceId, long timeToWait, String timeoutMessage) { StringBuffer sb = new StringBuffer(timeoutMessage); try { - if ( clientUimaAsEngine.getCache().containsKey(casReferenceId) ) { + if ( casReferenceId != null && clientUimaAsEngine.getCache().containsKey(casReferenceId) ) { ClientRequest cr = (ClientRequest)clientUimaAsEngine.getCache().get(casReferenceId); if ( cr != null ) {