Author: cwiklik
Date: Sat Nov  2 17:21:10 2013
New Revision: 1538234

URL: http://svn.apache.org/r1538234
Log:
UIMA-3383 Refactored code related to handling of Quiesce request. Both 'q' and 
CTRL-C initiated quiesce now wait for notification from InProcessCache when it 
becomes empty. Only when the cache is empty the process is allowed to exit.

Modified:
    
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UIMA_Service.java
    
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java
    
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineInstancePoolWithThreadAffinity.java
    
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
    
uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties

Modified: 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UIMA_Service.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UIMA_Service.java?rev=1538234&r1=1538233&r2=1538234&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UIMA_Service.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/service/UIMA_Service.java
 Sat Nov  2 17:21:10 2013
@@ -23,12 +23,8 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InvalidClassException;
-import java.net.Socket;
 
 import org.apache.uima.UIMAFramework;
-import org.apache.uima.UimaContext;
-import org.apache.uima.UimaContextAdmin;
-import org.apache.uima.aae.UimaASApplicationEvent;
 import org.apache.uima.aae.UimaASApplicationExitEvent;
 import org.apache.uima.aae.UimaAsVersion;
 import org.apache.uima.aae.controller.AnalysisEngineController;
@@ -395,7 +391,7 @@ public class UIMA_Service implements App
     } else if ( event instanceof UimaASApplicationExitEvent ) {
        System.out.println("Service Wrapper Received UimaASApplicationEvent. 
Message:"+event.getSource());
     } else if ( event instanceof ContextStoppedEvent ){ // Spring has been 
shutdown
-       System.exit(0);
+       
     }
   }
 
@@ -490,7 +486,6 @@ public class UIMA_Service implements App
   }
   
   static class ServiceShutdownHook extends Thread {
-
     public SpringContainerDeployer serviceDeployer;
 
     public ServiceShutdownHook(SpringContainerDeployer serviceDeployer) {
@@ -501,18 +496,19 @@ public class UIMA_Service implements App
       try {
        AnalysisEngineController topLevelController = 
serviceDeployer.getTopLevelController();
        if (topLevelController != null && !topLevelController.isStopped() ) {
-         UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, 
CLASS_NAME.getName(),
+          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, 
CLASS_NAME.getName(),
                 "run", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                 "UIMAJMS_caught_signal__INFO", new Object[] { 
topLevelController.getComponentName() });
-           serviceDeployer.undeploy(SpringContainerDeployer.QUIESCE_AND_STOP);
-         }
+         serviceDeployer.undeploy(SpringContainerDeployer.QUIESCE_AND_STOP);
+         } 
       } catch( Exception e) {
         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
           UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, 
CLASS_NAME.getName(),
                   "run", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
                   "UIMAJMS_exception__WARNING", e);
         }
-      }
+      } 
     }
+
   } 
 }

Modified: 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java?rev=1538234&r1=1538233&r2=1538234&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/InProcessCache.java
 Sat Nov  2 17:21:10 2013
@@ -31,6 +31,7 @@ import java.util.concurrent.ConcurrentHa
 import java.util.concurrent.Semaphore;
 
 import org.apache.uima.UIMAFramework;
+import org.apache.uima.aae.controller.BaseAnalysisEngineController;
 import org.apache.uima.aae.controller.Endpoint;
 import org.apache.uima.aae.controller.EventSubscriber;
 import org.apache.uima.aae.error.AsynchAEException;
@@ -64,6 +65,18 @@ public class InProcessCache implements I
 
   int size = 0;
 
+  private BaseAnalysisEngineController controller;
+  
+  /**
+         Register controller to call when the cache becomes empty.
+    This call is made when the controller enters quiesce
+    state. In this state the controller waits for 
+    the cache to send notification when all CASes have been 
+    processed.
+  **/
+  public void registerController(BaseAnalysisEngineController ctrl) {
+    controller = ctrl;
+  }
   public void registerCallbackWhenCacheEmpty(EventSubscriber aController) {
     registerCallbackWhenCacheEmpty(aController, 0);
   }
@@ -271,7 +284,10 @@ public class InProcessCache implements I
         ((EventSubscriber) callbackListeners.get(i)).onCacheEmpty();
       }
     }
-
+    if (cache.size() == 0 && controller != null ) {
+      // unblock the controller waiting in quiesceAndStop
+      controller.notifyOnCacheEmpty();
+    }
   }
 
   // never called 5/2013

Modified: 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineInstancePoolWithThreadAffinity.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineInstancePoolWithThreadAffinity.java?rev=1538234&r1=1538233&r2=1538234&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineInstancePoolWithThreadAffinity.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/AnalysisEngineInstancePoolWithThreadAffinity.java
 Sat Nov  2 17:21:10 2013
@@ -50,7 +50,6 @@ public class AnalysisEngineInstancePoolW
                  lock.acquireUninterruptibly();
                  // Call destroy() on AE on checkin if the UIMA AS process is 
in quiesce mode  
                  if ( destroyAEInstanceIfFree ) {
-                   System.out.println("........... 
AnalysisEngineInstancePool.checkin() - 
Thread:"+Thread.currentThread().getId()+" calling destroy() on AE checkin");
                    anAnalysisEngine.destroy();
                  } else {
              aeInstanceMap.put(Thread.currentThread().getId(), 
anAnalysisEngine);

Modified: 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java?rev=1538234&r1=1538233&r2=1538234&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/controller/BaseAnalysisEngineController.java
 Sat Nov  2 17:21:10 2013
@@ -105,6 +105,9 @@ public abstract class BaseAnalysisEngine
   private static final String JMS_PROVIDER_HOME = "ACTIVEMQ_HOME";
   public static enum ServiceState { INITIALIZING, RUNNING, DISABLED, STOPPING, 
FAILED };
   public static final boolean NO_RECOVERY = true;
+  // Semaphore use only when quiesceAndStop is called
+  // When the cache becomes empty the semaphore is released.
+  private Semaphore quiesceSemaphore = new Semaphore(0);
   
   protected ServiceState currentState = ServiceState.INITIALIZING;
   
@@ -1895,7 +1898,6 @@ public abstract class BaseAnalysisEngine
         }
       }
     }
-
     if (daemonServiceExecutor != null) {
       daemonServiceExecutor.shutdown();
     }
@@ -1907,8 +1909,8 @@ public abstract class BaseAnalysisEngine
     if (getOutputChannel() != null) {
       getOutputChannel().cancelTimers();
     }
+    
     if (this instanceof PrimitiveAnalysisEngineController) {
-
       getControllerLatch().release();
       // Stops the input channel of this service
       stopInputChannels(InputChannel.CloseAllChannels, shutdownNow);
@@ -1946,7 +1948,6 @@ public abstract class BaseAnalysisEngine
          }
        }
     }   
-    
     getInProcessCache().releaseAllCASes();
     getLocalCache().clear();
     releasedAllCASes = true;
@@ -2019,7 +2020,13 @@ public abstract class BaseAnalysisEngine
     super.destroy();
 
   }
-
+  /**
+   * This method is called by InProcessCache when the cache becomes empty 
while the controller
+   * is in Quiesce mode.
+   */
+  public void notifyOnCacheEmpty() {
+    quiesceSemaphore.release();
+  }
   /**
    * Stops input channel(s) and waits for CASes still in play to complete 
processing. When the
    * InProcessCache becomes empty, initiate the service shutdown.
@@ -2030,6 +2037,10 @@ public abstract class BaseAnalysisEngine
               UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_stop__INFO",
               new Object[] { getComponentName() });
     }
+    System.out.println("Quiescing UIMA-AS Service. Remaining Number of CASes 
to Process:"+getInProcessCache().getSize());
+    // Register callback when the inProcessCache becomes empty
+    getInProcessCache().registerController(this);
+    
     if (!isStopped() && !callbackReceived) {
       getControllerLatch().release();
       // To support orderly shutdown, the service must first stop its input 
channel and
@@ -2065,12 +2076,22 @@ public abstract class BaseAnalysisEngine
                }
         }
         stopInputChannels(InputChannel.InputChannels, true);  
-        // close JMS connection 
-        stop(false); // wait for any remaining CASes in flight to finish
+       
+        try {
+          if ( !getInProcessCache().isEmpty() ) {
+            // acquire semaphore and wait for the InProcessCache to call 
notifyOnCacheEmpty()
+            // on this controller when the cache becomes empty.
+            quiesceSemaphore.acquire();
+          }
+          System.out.println("UIMA-AS Service is Stopping, All CASes Have Been 
Processed");
+        } catch( InterruptedException e) {
+          
+        }
+        stop(false); 
       }
     }
   }
-
+  
   protected void stopDelegateTimers() {
     Iterator<Delegate> it = delegates.iterator();
     while (it.hasNext()) {

Modified: 
uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties
URL: 
http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties?rev=1538234&r1=1538233&r2=1538234&view=diff
==============================================================================
--- 
uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties
 (original)
+++ 
uima/uima-as/trunk/uimaj-as-jms/src/main/resources/jms_adapter_messages.properties
 Sat Nov  2 17:21:10 2013
@@ -238,3 +238,7 @@ UIMAJMS_start_inactivity_monitor__INFO=C
 UIMAJMS_discard_msg__INFO=Component: {0} Message From: {1} IP: {2} Message 
Type: {3} Command: {4} CasId: {5}
 UIMAJMS_show_stats__INFO=Component: {0} Stats for CasId:{1}\n\t{2}
 UIMAJMS_session_open__FINE=Service: {0} Opened Session To Queue:{1} Broker:{2}
+UIMAJMS_invalid_broker_url__WARNING=Unexpected State - the Broker URL is NULL
+UIMAJMS_shared_connections__INFO=Shared Connection: {0}
+UIMAJMS_retrying_jms_connection__WARNING=Connection Invalid - Retrying 
Connection to Broker:{0} Every 5 Seconds
+


Reply via email to