Author: cwiklik
Date: Thu Sep 22 19:15:05 2011
New Revision: 1174327

URL: http://svn.apache.org/viewvc?rev=1174327&view=rev
Log:
UIMA-2232 fixes problems associated with handling timeouts 

Modified:
    
uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
    
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java
    
uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
    
uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/ClientServiceDelegate.java

Modified: 
uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java?rev=1174327&r1=1174326&r2=1174327&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
 Thu Sep 22 19:15:05 2011
@@ -90,7 +90,57 @@ public class TestUimaASExtended extends 
             + System.getProperty("file.separator") + "bin" + 
System.getProperty("file.separator")
             + "dd2spring.xsl");
   }
-  
+  /**
+   * Tests service quiesce and stop support. This test sets a CasPool to 1 to 
send just one CAS at a
+   * time. After the first CAS is sent, a thread is started with a timer to 
expire before the reply
+   * is received. When the timer expires, the client initiates quiesceAndStop 
on the top level
+   * controller. As part of this, the top level controller stops its listeners 
on the input queue
+   * (GetMeta and Process Listeners), and registers a callback with the 
InProcess cache. When the
+   * cache is empty, meaning all CASes are processed, the cache notifies the 
controller which then
+   * begins the service shutdown. Meanwhile, the client receives a reply for 
the first CAS, and
+   * sends a second CAS. This CAS, will remain in the queue as the service has 
previously stopped
+   * its listeners. The client times out after 10 seconds and shuts down.
+   * 
+   * @throws Exception
+   */
+  public void testQuiesceAndStop() throws Exception {
+    System.out.println("-------------- testQuiesceAndStop -------------");
+    BaseUIMAAsynchronousEngine_impl eeUimaEngine = new 
BaseUIMAAsynchronousEngine_impl();
+    Map<String, Object> appCtx = 
buildContext(String.valueOf(broker.getMasterConnectorURI()),
+            "TopLevelTaeQueue");
+    // Set an explicit process timeout so to test the ping on timeout
+    appCtx.put(UimaAsynchronousEngine.Timeout, 10000);
+    appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 300);
+    appCtx.put(UimaAsynchronousEngine.CasPoolSize, 1);
+    String containers[] = new String[2];
+    containers[0] = deployService(eeUimaEngine, relativePath + 
"/Deploy_NoOpAnnotator.xml");
+    containers[1] =  deployService(eeUimaEngine, relativePath
+            + "/Deploy_AggregateAnnotatorWithInternalCM1000Docs.xml");
+    spinShutdownThread(eeUimaEngine, 5000, containers, 
SpringContainerDeployer.QUIESCE_AND_STOP);
+    runTest(appCtx, eeUimaEngine, 
String.valueOf(broker.getMasterConnectorURI()),
+            "TopLevelTaeQueue", 3, EXCEPTION_LATCH);
+  }
+
+  public void testStopNow() throws Exception {
+    System.out.println("-------------- testAggregateWithFailedRemoteDelegate 
-------------");
+    BaseUIMAAsynchronousEngine_impl eeUimaEngine = new 
BaseUIMAAsynchronousEngine_impl();
+    String containers[] = new String[2];
+
+    containers[0] = deployService(eeUimaEngine, relativePath + 
"/Deploy_NoOpAnnotator.xml");
+    containers[1] = deployService(eeUimaEngine, relativePath
+            + "/Deploy_AggregateAnnotatorWithInternalCM1000Docs.xml");
+    Map<String, Object> appCtx = 
buildContext(String.valueOf(broker.getMasterConnectorURI()),
+            "TopLevelTaeQueue");
+    // Set an explicit process timeout so to test the ping on timeout
+    appCtx.put(UimaAsynchronousEngine.Timeout, 4000);
+    appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 300);
+    spinShutdownThread(eeUimaEngine, 3000, containers, 
SpringContainerDeployer.STOP_NOW);
+    //  send may fail since we forcefully stop the service. Tolerate
+    //  ResourceProcessException
+    addExceptionToignore(ResourceProcessException.class); 
+    runTest(appCtx, eeUimaEngine, 
String.valueOf(broker.getMasterConnectorURI()),
+            "TopLevelTaeQueue", 10, EXCEPTION_LATCH);
+  }
   public void testAggregateHttpTunnelling() throws Exception {
     System.out.println("-------------- testAggregateHttpTunnelling 
-------------");
     // Create Uima EE Client
@@ -1228,57 +1278,7 @@ public class TestUimaASExtended extends 
             1, EXCEPTION_LATCH);
   }
 
-  /**
-   * Tests service quiesce and stop support. This test sets a CasPool to 1 to 
send just one CAS at a
-   * time. After the first CAS is sent, a thread is started with a timer to 
expire before the reply
-   * is received. When the timer expires, the client initiates quiesceAndStop 
on the top level
-   * controller. As part of this, the top level controller stops its listeners 
on the input queue
-   * (GetMeta and Process Listeners), and registers a callback with the 
InProcess cache. When the
-   * cache is empty, meaning all CASes are processed, the cache notifies the 
controller which then
-   * begins the service shutdown. Meanwhile, the client receives a reply for 
the first CAS, and
-   * sends a second CAS. This CAS, will remain in the queue as the service has 
previously stopped
-   * its listeners. The client times out after 10 seconds and shuts down.
-   * 
-   * @throws Exception
-   */
-  public void testQuiesceAndStop() throws Exception {
-    System.out.println("-------------- testQuiesceAndStop -------------");
-    BaseUIMAAsynchronousEngine_impl eeUimaEngine = new 
BaseUIMAAsynchronousEngine_impl();
-    Map<String, Object> appCtx = 
buildContext(String.valueOf(broker.getMasterConnectorURI()),
-            "TopLevelTaeQueue");
-    // Set an explicit process timeout so to test the ping on timeout
-    appCtx.put(UimaAsynchronousEngine.Timeout, 10000);
-    appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 300);
-    appCtx.put(UimaAsynchronousEngine.CasPoolSize, 1);
-    String containers[] = new String[2];
-    containers[0] = deployService(eeUimaEngine, relativePath + 
"/Deploy_NoOpAnnotator.xml");
-    containers[1] =  deployService(eeUimaEngine, relativePath
-            + "/Deploy_AggregateAnnotatorWithInternalCM1000Docs.xml");
-    spinShutdownThread(eeUimaEngine, 2000, containers, 
SpringContainerDeployer.QUIESCE_AND_STOP);
-    runTest(appCtx, eeUimaEngine, 
String.valueOf(broker.getMasterConnectorURI()),
-            "TopLevelTaeQueue", 3, EXCEPTION_LATCH);
-  }
-
-  public void testStopNow() throws Exception {
-    System.out.println("-------------- testAggregateWithFailedRemoteDelegate 
-------------");
-    BaseUIMAAsynchronousEngine_impl eeUimaEngine = new 
BaseUIMAAsynchronousEngine_impl();
-    String containers[] = new String[2];
 
-    containers[0] = deployService(eeUimaEngine, relativePath + 
"/Deploy_NoOpAnnotator.xml");
-    containers[1] = deployService(eeUimaEngine, relativePath
-            + "/Deploy_AggregateAnnotatorWithInternalCM1000Docs.xml");
-    Map<String, Object> appCtx = 
buildContext(String.valueOf(broker.getMasterConnectorURI()),
-            "TopLevelTaeQueue");
-    // Set an explicit process timeout so to test the ping on timeout
-    appCtx.put(UimaAsynchronousEngine.Timeout, 4000);
-    appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, 300);
-    spinShutdownThread(eeUimaEngine, 3000, containers, 
SpringContainerDeployer.STOP_NOW);
-    //  send may fail since we forcefully stop the service. Tolerate
-    //  ResourceProcessException
-    addExceptionToignore(ResourceProcessException.class); 
-    runTest(appCtx, eeUimaEngine, 
String.valueOf(broker.getMasterConnectorURI()),
-            "TopLevelTaeQueue", 10, EXCEPTION_LATCH);
-  }
   public void testCMAggregateClientStopRequest() throws Exception {
     System.out.println("-------------- testCMAggregateClientStopRequest 
-------------");
     final BaseUIMAAsynchronousEngine_impl eeUimaEngine = new 
BaseUIMAAsynchronousEngine_impl();

Modified: 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java?rev=1174327&r1=1174326&r2=1174327&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/delegate/Delegate.java
 Thu Sep 22 19:15:05 2011
@@ -56,7 +56,7 @@ public abstract class Delegate {
   private Endpoint endpoint;
 
   // Timer object to time replies
-  private Timer timer;
+  private DelegateTimer timer;
 
   // Process Timeout value for this delegate
   private long casProcessTimeout = 0;
@@ -136,6 +136,19 @@ public abstract class Delegate {
     return endpoint;
   }
 
+  public void cancelTimerForCasOrPurge(String casReferenceId) {
+         if ( timer != null && timer.getTimerCasId() != null && 
timer.getTimerCasId().equals(casReferenceId)) {
+                 //System.out.println("\n\n\t Canceled Timer For 
CAS:"+casReferenceId+" and Restarting Timer for the next oldest CAS in the 
outstanding list\n\n");
+                 cancelDelegateTimer();
+                 //    Restart timer for the next older CAS in the oustanding 
list
+                 restartTimerForOldestCasInOutstandingList();
+         } else {
+                 // Given CAS is not the oldest in outstanding list. Purge the 
CAS from both outstanding and
+                 // pending dispatch lists (if exists).
+                 removeCasFromOutstandingList(casReferenceId);
+                 removeCasFromPendingDispatchList(casReferenceId);
+         }
+  }
   /**
    * Forces Timer restart for the oldest CAS sitting in the list of CASes 
pending reply.
    */
@@ -251,6 +264,7 @@ public abstract class Delegate {
                   new Object[] { getComponentName(), delegateKey, 
aCasReferenceId,
                       outstandingCasList.size() });
         }
+        //System.out.println("\n\n\t++++++++++++++++++++++++ :::::: Added New 
CAS to Outstanding 
List:"+entry.getCasReferenceId()+"\n\tOutstanding:"+toString());
       }
     }
   }
@@ -294,7 +308,7 @@ public abstract class Delegate {
    * Logs CASes sitting in the list of CASes pending dispatch. These CASes 
were delayed due to a bad
    * state of the delegate.
    */
-  private void dumpDelayedList() {
+  protected void dumpDelayedList() {
     if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
       for (DelegateEntry entry : pendingDispatchList) {
         UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, 
this.getClass().getName(),
@@ -571,6 +585,12 @@ public abstract class Delegate {
   public void startGetMetaRequestTimer() {
     startDelegateTimer(null, AsynchAEMessage.GetMeta);
   }
+  /**
+   * Starts GetMeta Request timer
+   */
+  public void startGetMetaRequestTimer(String casReferenceId) {
+    startDelegateTimer(casReferenceId, AsynchAEMessage.GetMeta);
+  }
 
   /**
    * Starts a timer for a given command
@@ -581,10 +601,11 @@ public abstract class Delegate {
    *          - command for which the timer is started
    */
   private synchronized void startDelegateTimer(final String aCasReferenceId, 
final int aCommand) {
-    final long timeToWait = getTimeoutValueForCommand(aCommand);
+    
+         final long timeToWait = getTimeoutValueForCommand(aCommand);
     Date timeToRun = new Date(System.currentTimeMillis() + timeToWait);
-    timer = new Timer("Controller:" + getComponentName() + ":Request 
TimerThread-Endpoint_impl:"
-            + endpoint + ":" + System.nanoTime() + ":Cmd:" + aCommand);
+    timer = new DelegateTimer("Controller:" + getComponentName() + ":Request 
TimerThread-Endpoint_impl:"
+            + endpoint + ":" + System.nanoTime() + ":Cmd:" + aCommand, true, 
aCasReferenceId,this);
     final Delegate delegate = this;
     timer.schedule(new TimerTask() {
       public void run() {
@@ -593,7 +614,6 @@ public abstract class Delegate {
         errorContext.add(AsynchAEMessage.Command, aCommand);
         String enrichedMessage = enrichProcessCASTimeoutMessage(aCommand, 
aCasReferenceId,timeToWait,"Delegate Service:"+delegateKey+" Has Timed Out 
While Processing CAS:"+aCasReferenceId );
         Exception cause = new MessageTimeoutException(enrichedMessage);
-//        Exception cause = new MessageTimeoutException("Delegate 
Service:"+delegateKey+" Has Timed Out While Processing CAS:"+aCasReferenceId);
         if (AsynchAEMessage.Process == aCommand) {
           if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
             UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, 
this.getClass().getName(),
@@ -614,6 +634,10 @@ public abstract class Delegate {
             errorContext.add(AsynchAEMessage.ErrorCause, 
AsynchAEMessage.PingTimeout);
           }
         } else if (AsynchAEMessage.GetMeta == aCommand) {
+               if ( aCasReferenceId != null ) {  // true on GetMeta Ping 
timeout
+               errorContext.add(AsynchAEMessage.CasReference, aCasReferenceId);
+                errorContext.add(AsynchAEMessage.ErrorCause, 
AsynchAEMessage.PingTimeout);
+               }
           if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
             UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, 
this.getClass().getName(),
                     "Delegate.TimerTask.run", 
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
@@ -758,6 +782,19 @@ public abstract class Delegate {
       return casReferenceId;
     }
   }
+  protected String getDelayedCASes() {
+           StringBuffer sb = new StringBuffer();
+           List<DelegateEntry> copyOfOutstandingCASes = 
+             new ArrayList<DelegateEntry>(outstandingCasList);
+           for (DelegateEntry entry : copyOfOutstandingCASes) {
+             if ( entry != null && entry.getCasReferenceId() != null ) {
+               sb.append("["+entry.getCasReferenceId()+"]");
+             } 
+           }
+           return sb.toString();
+
+  }
+  
   public String toString(){
     StringBuffer sb = new StringBuffer();
     List<DelegateEntry> copyOfOutstandingCASes = 
@@ -769,4 +806,21 @@ public abstract class Delegate {
     }
     return sb.toString();
   }
+  private class DelegateTimer extends Timer {
+         String casReferenceId;
+         Delegate delegate;
+         public DelegateTimer(String threadName, boolean isDaemon, String 
casReferenceId, Delegate delegate) {
+                 super(threadName, isDaemon);
+                 this.casReferenceId = casReferenceId;
+                 this.delegate = delegate;
+         }
+         public void cancel() {
+                 super.cancel();
+             //System.out.println("\n\n---------------------------------- 
Cancelled Timer on CAS:"+casReferenceId+" 
.\n\tOutstanding:"+delegate.toString()+"\n\n");
+             //Thread.dumpStack();
+         }
+         public String getTimerCasId() {
+                 return casReferenceId;
+         }
+  }
 }

Modified: 
uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java?rev=1174327&r1=1174326&r2=1174327&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngineCommon_impl.java
 Thu Sep 22 19:15:05 2011
@@ -834,15 +834,27 @@ public abstract class BaseUIMAAsynchrono
           if ( !serviceDelegate.isAwaitingPingReply() && 
sharedConnection.isOpen() ) {
             serviceDelegate.setAwaitingPingReply();
             // Add the cas to a list of CASes pending reply. Also start the 
timer if necessary
-            
serviceDelegate.addCasToOutstandingList(requestToCache.getCasReferenceId());
+                       // 
serviceDelegate.addCasToOutstandingList(requestToCache.getCasReferenceId());
+                 
+                 // since the service is in time out state, we dont send CASes 
to it just yet. Instead, place
+                 // a CAS in a pending dispatch list. CASes from this list 
will be sent once a response to PING
+                 // arrives.
+            
serviceDelegate.addCasToPendingDispatchList(requestToCache.getCasReferenceId());
             if ( cpcReadySemaphore.availablePermits() > 0 ) {
               acquireCpcReadySemaphore();
             }
 
             // Send PING Request to check delegate's availability
             sendMetaRequest();
-            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, 
CLASS_NAME.getName(), "sendCAS",
+            serviceDelegate.cancelDelegateTimer();
+            // Start a timer for GetMeta ping and associate a cas id
+            // with this timer. The delegate is currently in a timed out
+            // state due to a timeout on a CAS with a given casReferenceId.
+            //  
+            serviceDelegate.startGetMetaRequestTimer(casReferenceId);
+            
+            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, 
CLASS_NAME.getName(), "sendCAS",
                       JmsConstants.JMS_LOG_RESOURCE_BUNDLE, 
"UIMAJMS_client_sending_ping__FINE",
                       new Object[] { serviceDelegate.getKey() });
             }
@@ -854,7 +866,11 @@ public abstract class BaseUIMAAsynchrono
               return casReferenceId;
             } else {
               //  Add to the outstanding list.  
-              
serviceDelegate.addCasToOutstandingList(requestToCache.getCasReferenceId());
+                         //  
serviceDelegate.addCasToOutstandingList(requestToCache.getCasReferenceId());
+                 // since the service is in time out state, we dont send CASes 
to it just yet. Instead, place
+                 // a CAS in a pending dispatch list. CASes from this list 
will be sent once a response to PING
+                 // arrives.
+              
serviceDelegate.addCasToPendingDispatchList(requestToCache.getCasReferenceId());
               return casReferenceId;
             }
           }
@@ -973,12 +989,16 @@ public abstract class BaseUIMAAsynchrono
       //  reset the state of the service. The client received its ping reply  
       serviceDelegate.resetAwaitingPingReply();
       String casReferenceId = null;
-      if (serviceDelegate.getCasPendingReplyListSize() > 0) {
-        casReferenceId = serviceDelegate.removeOldestCasFromOutstandingList(); 
       
-        ClientRequest cachedRequest = (ClientRequest) 
clientCache.get(casReferenceId);
-        if (cachedRequest != null) {
-          sendCAS(cachedRequest.getCAS(), cachedRequest);
-        }
+      if (serviceDelegate.getCasPendingReplyListSize() > 0 || 
serviceDelegate.getCasPendingDispatchListSize() > 0) {
+         serviceDelegate.restartTimerForOldestCasInOutstandingList();
+          //   We got a reply to GetMeta ping. Send all CASes that have been
+          //    placed on the pending dispatch queue to a service.
+         while( (casReferenceId = 
serviceDelegate.removeOldestFromPendingDispatchList()) != null ) {
+               ClientRequest cachedRequest = (ClientRequest) 
clientCache.get(casReferenceId);
+               if (cachedRequest != null) {
+                 sendCAS(cachedRequest.getCAS(), cachedRequest);
+               }
+         }
       } else {
         ProcessTrace pt = new ProcessTrace_impl();
         UimaASProcessStatusImpl status = new UimaASProcessStatusImpl(pt);
@@ -2038,60 +2058,79 @@ public abstract class BaseUIMAAsynchrono
         break;
 
       case (ProcessTimeout):
-        ClientRequest cachedRequest = (ClientRequest) 
clientCache.get(casReferenceId);
-        if (cachedRequest != null) {
-          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
-            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, 
CLASS_NAME.getName(),
-                  "notifyOnTimout", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                  "UIMAJMS_process_timeout_WARNING", new Object[] { 
anEndpoint, cachedRequest.getHostIpProcessingCAS() });
-          }
-        } else {
-          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
-            // if missing for any reason ...
-            UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, 
CLASS_NAME.getName(),
-                    "handleProcessReply", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                    "UIMAJMS_received_expired_msg_INFO",
-                    new Object[] { anEndpoint, casReferenceId });
-          }
-          return;
-        }
-        // Store the total latency for this CAS. The departure time is set 
right before the CAS
-        // is sent to a service.
-        cachedRequest.setTimeWaitingForReply(System.nanoTime()
-                - cachedRequest.getCASDepartureTime());
-
-        // mark timeout exception
-        cachedRequest.setTimeoutException();
-
-        if (cachedRequest.isSynchronousInvocation()) {
-          // Signal a thread that we received a reply, if in the map
-          if (threadMonitorMap.containsKey(cachedRequest.getThreadId())) {
-            ThreadMonitor threadMonitor = (ThreadMonitor) 
threadMonitorMap.get(cachedRequest
-                    .getThreadId());
-            // Unblock the sending thread so that it can complete processing 
with an error
-            if (threadMonitor != null) {
-              threadMonitor.getMonitor().release();
-              cachedRequest.setReceivedProcessCasReply(); // should not be 
needed
-            }
-          }
-        } else {
-          // notify the application listener with the error
-          if ( serviceDelegate.isPingTimeout()) {
-            exc = new UimaASProcessCasTimeout(new UimaASPingTimeout("UIMA AS 
Client Ping Time While Waiting For Reply From a Service On Queue:"+anEndpoint));
-            serviceDelegate.resetPingTimeout();
-          } else {
-            exc = new UimaASProcessCasTimeout("UIMA AS Client Timed Out 
Waiting For CAS:"+casReferenceId+ " Reply From a Service On Queue:"+anEndpoint);
-          }
-          status.addEventStatus("Process", "Failed", exc);
-          notifyListeners(aCAS, status, AsynchAEMessage.Process);
-        }
-        cachedRequest.removeEntry(casReferenceId);
-        serviceDelegate.removeCasFromOutstandingList(casReferenceId);
-        // Check if all replies have been received
-        long outstandingCasCount = outstandingCasRequests.decrementAndGet();
-        if (outstandingCasCount == 0) {
-          cpcReadySemaphore.release();
-        }
+         if ( casReferenceId != null ) {
+               ClientRequest cachedRequest = (ClientRequest) 
clientCache.get(casReferenceId);
+               if (cachedRequest != null) {
+                 if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+                   UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, 
CLASS_NAME.getName(),
+                         "notifyOnTimout", 
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                         "UIMAJMS_process_timeout_WARNING", new Object[] { 
anEndpoint, cachedRequest.getHostIpProcessingCAS() });
+                 }
+               } else {
+                 if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+                   // if missing for any reason ...
+                   UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, 
CLASS_NAME.getName(),
+                           "notifyOnTimout", 
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                           "UIMAJMS_received_expired_msg_INFO",
+                           new Object[] { anEndpoint, casReferenceId });
+                 }
+                 return;
+               }
+               // Store the total latency for this CAS. The departure time is 
set right before the CAS
+               // is sent to a service.
+               cachedRequest.setTimeWaitingForReply(System.nanoTime()
+                       - cachedRequest.getCASDepartureTime());
+
+               // mark timeout exception
+               cachedRequest.setTimeoutException();
+
+               if (cachedRequest.isSynchronousInvocation()) {
+                 // Signal a thread that we received a reply, if in the map
+                 if 
(threadMonitorMap.containsKey(cachedRequest.getThreadId())) {
+                   ThreadMonitor threadMonitor = (ThreadMonitor) 
threadMonitorMap.get(cachedRequest
+                           .getThreadId());
+                   // Unblock the sending thread so that it can complete 
processing with an error
+                   if (threadMonitor != null) {
+                     threadMonitor.getMonitor().release();
+                     cachedRequest.setReceivedProcessCasReply(); // should not 
be needed
+                   }
+                 }
+               } else {
+                 // notify the application listener with the error
+                 if ( serviceDelegate.isPingTimeout()) {
+                   exc = new UimaASProcessCasTimeout(new 
UimaASPingTimeout("UIMA AS Client Ping Time While Waiting For Reply From a 
Service On Queue:"+anEndpoint));
+                   serviceDelegate.resetPingTimeout();
+                 } else {
+                   exc = new UimaASProcessCasTimeout("UIMA AS Client Timed Out 
Waiting For CAS:"+casReferenceId+ " Reply From a Service On Queue:"+anEndpoint);
+                 }
+                 status.addEventStatus("Process", "Failed", exc);
+                 notifyListeners(aCAS, status, AsynchAEMessage.Process);
+               }
+               boolean isSynchronousCall = 
cachedRequest.isSynchronousInvocation();
+               
+               cachedRequest.removeEntry(casReferenceId);
+               serviceDelegate.removeCasFromOutstandingList(casReferenceId);
+               // Check if all replies have been received
+               long outstandingCasCount = 
outstandingCasRequests.decrementAndGet();
+               if (outstandingCasCount == 0) {
+                 cpcReadySemaphore.release();
+               }
+               //      
+               if ( !isSynchronousCall && 
serviceDelegate.getCasPendingReplyListSize() > 0) {
+                   String nextOutstandingCasReferenceId = 
+                               
serviceDelegate.getOldestCasIdFromOutstandingList();
+                       if ( nextOutstandingCasReferenceId != null ) {
+                               cachedRequest = (ClientRequest) 
clientCache.get(nextOutstandingCasReferenceId);
+                               try {
+                               sendCAS(cachedRequest.getCAS());
+                               } catch( Exception e) {
+                                       
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(),
+                                   "notifyOnTimout", 
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                                   "UIMAEE_exception__WARNING", e);
+                               }
+                       }
+               }
+         }
         break;
     } // case
   }

Modified: 
uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/ClientServiceDelegate.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/ClientServiceDelegate.java?rev=1174327&r1=1174326&r2=1174327&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/ClientServiceDelegate.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-jms/src/main/java/org/apache/uima/adapter/jms/client/ClientServiceDelegate.java
 Thu Sep 22 19:15:05 2011
@@ -89,7 +89,8 @@ public class ClientServiceDelegate exten
     String casReferenceId = null;
     CAS cas = null;
     ClientRequest cachedRequest = null;
-    
+    casReferenceId = (String) errorContext.get(AsynchAEMessage.CasReference);
+
     synchronized(errorMux) {
       if (!clientUimaAsEngine.running) {
         cancelDelegateTimer();
@@ -101,7 +102,7 @@ public class ClientServiceDelegate exten
         if (e instanceof MessageTimeoutException) {
           switch (command) {
             case AsynchAEMessage.Process:
-              casReferenceId = (String) 
errorContext.get(AsynchAEMessage.CasReference);
+              //casReferenceId = (String) 
errorContext.get(AsynchAEMessage.CasReference);
               if (casReferenceId != null) {
                 cachedRequest = (ClientRequest) 
clientUimaAsEngine.clientCache.get(casReferenceId);
                 if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)
@@ -145,25 +146,27 @@ public class ClientServiceDelegate exten
 
             case AsynchAEMessage.GetMeta:
               if (isAwaitingPingReply()) {
+                 
+                if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+                UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, 
CLASS_NAME.getName(),
+                        "handleError", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                        "UIMAJMS_client_ping_timed_out__WARNING", new Object[] 
{ clientUimaAsEngine.getEndPointName()});
+                }
+                super.resetAwaitingPingReply();
+                // Handling a Ping timeout, treat it as if it was a Process 
timeout. 
                 clientUimaAsEngine.notifyOnTimout(cas, 
clientUimaAsEngine.getEndPointName(),
-                        BaseUIMAAsynchronousEngineCommon_impl.PingTimeout, 
casReferenceId);
+                               
BaseUIMAAsynchronousEngineCommon_impl.ProcessTimeout, casReferenceId);
               } else {
+                  if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+                      UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, 
CLASS_NAME.getName(),
+                              "handleError", 
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                              "UIMAJMS_meta_timeout_WARNING", new Object[] { 
getKey() });
+                    }
                 // Notifies Listeners and removes ClientRequest instance from 
the client cache
                 clientUimaAsEngine.notifyOnTimout(cas, 
clientUimaAsEngine.getEndPointName(),
                         BaseUIMAAsynchronousEngineCommon_impl.MetadataTimeout, 
casReferenceId);
                 
clientUimaAsEngine.clientSideJmxStats.incrementMetaTimeoutErrorCount();
               }
-              if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
-                UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, 
CLASS_NAME.getName(),
-                        "handleError", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                        "UIMAJMS_meta_timeout_WARNING", new Object[] { 
getKey() });
-              }
-              if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
-                  UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, 
CLASS_NAME.getName(),
-                          "handleError", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                          "UIMAJMS_service_not_responding_to_ping__WARNING", 
new Object[] { clientUimaAsEngine.getEndPointName()});
-                }
-              clientUimaAsEngine.stop();
               break;
 
             case AsynchAEMessage.CollectionProcessComplete:
@@ -188,7 +191,7 @@ public class ClientServiceDelegate exten
   public String enrichProcessCASTimeoutMessage(int aCommand, String 
casReferenceId, long timeToWait, String timeoutMessage) {
     StringBuffer sb = new StringBuffer(timeoutMessage);
     try {
-      if ( clientUimaAsEngine.getCache().containsKey(casReferenceId) ) {
+      if ( casReferenceId != null && 
clientUimaAsEngine.getCache().containsKey(casReferenceId) ) {
         ClientRequest cr = 
           (ClientRequest)clientUimaAsEngine.getCache().get(casReferenceId);
         if ( cr != null ) {


Reply via email to