Author: challngr
Date: Tue Jun 17 14:38:50 2014
New Revision: 1603187

URL: http://svn.apache.org/r1603187
Log:
UIMA-3727 Updates

Modified:
    
uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/java/org/apache/uima/ducc/ping/SamplePing.java

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/java/org/apache/uima/ducc/ping/SamplePing.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/java/org/apache/uima/ducc/ping/SamplePing.java?rev=1603187&r1=1603186&r2=1603187&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/java/org/apache/uima/ducc/ping/SamplePing.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-examples/src/main/java/org/apache/uima/ducc/ping/SamplePing.java
 Tue Jun 17 14:38:50 2014
@@ -16,9 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
 */
-
 package org.apache.uima.ducc.ping;
 
+/*
+ * IMPORTANT: This is a sample custom pinger for illustration purposes only.  
It is not
+ *            supported in any way.
+ */
 
 import java.io.IOException;
 import java.io.StringReader;
@@ -47,34 +50,37 @@ import org.apache.uima.ducc.common.utils
 import org.apache.uima.resource.ResourceInitializationException;
 import org.apache.uima.util.Level;
 
+
+
 // 'q_thresh=nn,window=mm,broker_jmx=1100,meta_timeout=10000'
 public class SamplePing
     extends AServicePing
 {
-    String ep;
+    String ep;                           // Endpoint, passed in during 
initialization
+
+    String qname;                        // Service queue name, parsed from ep
+    String broker;                       // Service broker, parsed from ep
+    int    meta_timeout = 5000;          // default
+
+    String broker_host;                  // AMQ hostname, parsed from 'broker' 
URL
+    int    broker_jmx_port = 1099;       // AMQ jmx port, default
 
-    String endpoint;
-    String broker;
-    int    meta_timeout = 5000;
-
-    String broker_host;
-    int    broker_jmx_port = 1099;
-    boolean connected;
-    UimaAsServiceMonitor monitor;
-    DuccLogger logger = null;
-
-    String nodeIp;
-    String pid;
-    boolean gmfail = false;
+    UimaAsServiceMonitor monitor;        // Part of ping API, knows how to 
query AMQ for broker stats
 
-    int additions = 0;                   // n additions to signal to SM
-    Long[] deletions = null;             // which instances to shrink
+    String nodeIp;                       // For UIMA-AS get-meta callback, IP 
of node answering get-meta
+    String pid;                          //  "     "       "        "    , 
process answering get-meta
+    boolean gmfail = false;              // Did get=meta work?
+    boolean fast_shrink = true;          // If false, don't shrink instances 
if there are producers
+                                         // still connected to the Q.
 
-    int min_instances = 0;               // smallest instances to maintain
+    int additions = 0;                   // number of additions to signal to SM
+    Long[] deletions = null;             // which specific instances to shrink
+
+    int min_instances = 0;               // minimum instances to maintain
     int max_instances = 10;              // max instances to allow
 
     int max_growth = 5;                  // max processes to grow at any time
-    double goal = 2.00;                  // want to get enqueue time towith 
this factor of individual service time
+    double goal = 2.00;                  // want to get enqueue time to within 
this factor of individual service time
 
     int cursor = 0;                      // growth and shrinkage window cursor
     int expansion_period = 5;            // size of window in minutes
@@ -84,31 +90,26 @@ public class SamplePing
 
     public SamplePing()
     {
-        this.logger = null;
-    }
 
-    public SamplePing(DuccLogger logger)
-    {
-        this.logger = logger;
     }
 
     public void init(String args, String ep)
         throws Exception
     {
         String methodName = "init";
-        this.ep = ep;
 
+        this.ep = ep;
         doLog(methodName, "Ping.init(" + args + ", " + ep + " start.");
-        // Ep is of the form UIMA-AS:queuename:broker
+
+        // Ep is of the form UIMA-AS:queuename:broker. Parse out queue name 
and broker URL
         int ndx = ep.indexOf(":");
         ep = ep.substring(ndx+1);
         ndx = ep.indexOf(":");
             
-        this.endpoint = ep.substring(0, ndx).trim();
+        this.qname = ep.substring(0, ndx).trim();
         this.broker = ep.substring(ndx+1).trim();
 
-        // broker is a URL that we need to parse in order to get the actual 
host and port
-        // for jmx
+        // broker is a URL that we need to parse in order to get the actual 
host for jmx calls
         URL url = null;
         try {                
             url = new URL(null, broker, new TcpStreamHandler());
@@ -116,18 +117,19 @@ public class SamplePing
             throw new IllegalArgumentException("Invalid broker URL: " + 
broker);
         }
         broker_host = url.getHost();
-        // not needed here fyi broker_port = url.getPort();
-
-                
+        
+        
+        // inhibit noisy default UIMAAs logging
         
UIMAFramework.getLogger(BaseUIMAAsynchronousEngineCommon_impl.class).setLevel(Level.OFF);
         
UIMAFramework.getLogger(BaseUIMAAsynchronousEngine_impl.class).setLevel(Level.OFF);
-        //UIMAFramework.getLogger().setLevel(Level.OFF);
-        // there are a couple junky messages that slip by the above 
configurations.  turn the whole danged thing off.
         UIMAFramework.getLogger().setLevel(Level.INFO);
 
+        // Parse out the pinger arguments which are comma-delimeted in the 
argument string
+        // We do this by splitting on ',', then writing as strings to a 
StringReader, and
+        // finally loading a PropertiesFile from them.  DuccProperties is used 
because it
+        // smart typed extraction of the properties.
         if ( args != null ) {
-            // 'q_thresh=nn,window=mm,broker_jmx_port=1100,meta_timeout=10000'
-            // turn the argument string into properties
+
             String[] as = args.split(",");
             StringWriter sw = new StringWriter();
             for ( String s : as ) sw.write(s + "\n");
@@ -142,23 +144,24 @@ public class SamplePing
             meta_timeout = props.getIntProperty("meta-timeout", meta_timeout);
             broker_jmx_port = props.getIntProperty("broker-jmx-port", 
broker_jmx_port);
             expansion_period = props.getIntProperty("window", 
expansion_period);
-            this.log_enabled = props.getBooleanProperty("enable-log", 
this.log_enabled);
             min_instances = props.getIntProperty("min", min_instances);
             max_instances = props.getIntProperty("max", max_instances);
             max_growth = props.getIntProperty("max-growth", max_growth);
+            fast_shrink = props.getBooleanProperty("fast-shrink", true);
             goal = props.getDoubleProperty("goal", goal);
         }
 
-        // insure window is in minutes, with one pane per ping
+        // Set up expansion/deletion windows with window size always 1 minute 
regardless
+        // of ping frequency.
         double calls_per_minute = 60000.00 / monitor_rate;
         window_size = (int) ( ((double)expansion_period) * calls_per_minute);
 
         expansion_window = new int[window_size];
         deletion_window  = new int[window_size];
 
-        System.out.println("log_enabled is " + log_enabled);
         doLog("<ctr>", 
               "INIT: meta-timeout", meta_timeout, 
+              "broker-host", broker_host,
               "broker-jmx-port", broker_jmx_port, 
               "monitor-window", expansion_period,
               "window-size", window_size,
@@ -167,12 +170,15 @@ public class SamplePing
               "min-instances", min_instances,
               "max-growth", max_growth,
               "goal", goal,
-              "enable-log", log_enabled);
+              "fast-shrink", fast_shrink);
 
-        this.monitor = new UimaAsServiceMonitor(endpoint, broker_host, 
broker_jmx_port);
+        // Set up the jmx queue monitor and reset statistics to insure each 
calculation is
+        // based on current data, not old data.
+        this.monitor = new UimaAsServiceMonitor(qname, broker_host, 
broker_jmx_port);
         monitor.resetStatistics();
     }
 
+    // Release resources when terminated.
     public void stop()
     {
        String methodName = "stop";
@@ -180,6 +186,9 @@ public class SamplePing
         doLog(methodName, "------------ Stop signal arrives, pinger exits.");
     }
 
+    // This is called by the Service Manager to collect the pinger data.  This 
pinger 
+    // issues a get-meta to the service to insure it's alive, then collects 
queue statistics via JMX
+    // and determines expansion and deletion for return when SM asks about 
them.
     public IServiceStatistics getStatistics()
     {
         String methodName = "getStatistics";
@@ -189,30 +198,30 @@ public class SamplePing
 
         // "health" has no real meaning.  maybe we can get rid of it one day?
         try {
-            monitor.collect();
+            monitor.collect();                         // Get jmx stuff
             statistics.setHealthy(true);
         } catch ( Throwable t ) {
             statistics.setHealthy(false);
             monitor.setJmxFailure(t.getMessage());
         }
 
-        // Instantiate Uima AS Client
+        // Instantiate Uima AS Client and issue get-meta
         BaseUIMAAsynchronousEngine_impl uimaAsEngine = new 
BaseUIMAAsynchronousEngine_impl();
         UimaCbListener listener = new UimaCbListener();
         uimaAsEngine.addStatusCallbackListener(listener);
         Map<String, Object> appCtx = new HashMap<String, Object>();
         appCtx.put(UimaAsynchronousEngine.ServerUri, broker);
-        appCtx.put(UimaAsynchronousEngine.ENDPOINT, endpoint);
+        appCtx.put(UimaAsynchronousEngine.ENDPOINT, qname);
         appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, meta_timeout);
 
         try {
-            uimaAsEngine.initialize(appCtx);
+            uimaAsEngine.initialize(appCtx);                     // This 
performs the get-meta
             statistics.setAlive(true);
             statistics.setHealthy(true && statistics.isHealthy());
             listener.ok();
         } catch( ResourceInitializationException e) {
-            listener.timeout();
-            doLog(methodName, "Cannot issue getMeta to: " + endpoint + ":" + 
broker);
+            listener.timeout();                                 // Service not 
responding
+            doLog(methodName, "Cannot issue getMeta to: " + qname + ":" + 
broker);
             statistics.setHealthy(false);
             statistics.setAlive(false);
         } finally {
@@ -223,20 +232,20 @@ public class SamplePing
                        }
         }
 
-        monitor.setSource(nodeIp, pid, gmfail);
-        statistics.setInfo(monitor.format());
+        monitor.setSource(nodeIp, pid, gmfail);                  // remember 
who responded
+        statistics.setInfo(monitor.format());                    // set string 
for web server
 
-        calculateNewDeployment(statistics);
+        calculateNewDeployment(statistics);                      // Decide on 
instance expansion or deletion
 
         return statistics;
     }
 
     /**
-     * Override from AServicePing
+     * Override from AServicePing, set in calculateNewDeployment while 
analyzing queue statistics
      */
     public long getLastUse()
     {
-        return last_use;
+        return last_use;                 
     }
 
     // 
================================================================================
@@ -293,7 +302,7 @@ public class SamplePing
         int new_ni = 0;
         // what we're looking for it Tt: the average time it takes on thread 
to compute one unit of work.
         if ( Q == 0 ) {
-            if ( pc != 0 ) {
+            if ( (pc != 0) && ( !fast_shrink) ) {
                 doLog(methodName, "Inhibit shrinkage because pc =", pc, "Q =", 
Q);
                 deletion_window[cursor] = 0;
             } else {
@@ -311,24 +320,20 @@ public class SamplePing
             double g = Tt * goal;   // we want to get eT close to this
             double r = eT / g;      // the ratio of current queue to to 
desired, which is the amount we need to 
                                     // change the instances by
-            doLog(methodName, "eT", eT, "Q", Q, "cc", cc, "Ti", Ti, "Tt", Tt, 
"g", g, "r", r, "active", active, "ninstances", ninst, "max_instances", 
max_instances);
+            doLog(methodName, "eT", eT, "Q", Q, "cc", cc, "Ti", Ti, "Tt", Tt, 
"g", g, "r", r, 
+                  "active", active, "ninstances", ninst, "max_instances", 
max_instances);
 
             if ( r > 1 ) {          // we want moref
 
                 // We must smooth it out.  First get the delta instances.
+                // Then cap it accounting for instances started but not yet 
connected to the q
                 // Then cap it on the max for the service.
                 // Then cap it on (arbitrarily) 5 so we don't blast too many 
new instances at once
-                new_ni = (int) Math.ceil(active * r);
-                doLog(methodName, "1 new_ni", new_ni);
-
+                new_ni = (int) Math.ceil(active * r);  // delta
                 new_ni = Math.max(new_ni - ninst, 0);  // here we account for 
intances that aren't yet started
-                doLog(methodName, "2 new_ni", new_ni);
-
-                new_ni = Math.min(new_ni, max_instances);
-                doLog(methodName, "3 new_ni", new_ni);
+                new_ni = Math.min(new_ni, max_instances); // cap on configured 
max
+                new_ni = Math.min(new_ni, max_growth);    // cap on growth rate
 
-                new_ni = Math.min(new_ni, max_growth);
-                doLog(methodName, "4 new_ni", new_ni);
 
                 if ( new_ni > 0 ) {                         // do we actually 
expand in the end?
                     doLog(methodName, "Expand, new_n1:", new_ni);
@@ -343,7 +348,7 @@ public class SamplePing
             }
 
             if ( r < .5 ) {                            // we're 
over-provisioned to the goal
-                if ( pc != 0 ) {
+                if ( (pc != 0) && (!fast_shrink) ) {
                     // never shrink if there are producers
                     doLog(methodName, "Inhibit shrinkage because pc =", pc, 
"r=", r);
                     deletion_window[cursor] = 0;
@@ -397,7 +402,8 @@ public class SamplePing
     //                                         NEW INTERFACES
     // 
================================================================================
     /**
-     * Implement this to indicate how many new instances to start.
+     * Implement this to indicate how many new instances to start.  The value 
here
+     * is calculted above in calculateNewDeployment.
      */
     public int getAdditions()
     {
@@ -405,7 +411,8 @@ public class SamplePing
     }
 
     /**
-     * Implement this to indicate how many instances to stop
+     * Implement this to indicate how many instances to stop.  The value here
+     * is calculted above in calculateNewDeployment.
      */
     public Long[] getDeletions()
     {
@@ -416,6 +423,10 @@ public class SamplePing
     //                                         END NEW INTERFACES
     // 
================================================================================
 
+    /**
+     * This is a callback class for the UIMA-AS get-meta, that tells us which 
process on 
+     * which host responded to the get-meta.
+     */
     class UimaCbListener extends UimaAsBaseCallbackListener 
     {
         public UimaCbListener()


Reply via email to