Author: challngr
Date: Sat Mar  9 15:09:12 2013
New Revision: 1454731

URL: http://svn.apache.org/r1454731
Log:
UIMA-2728 
Fix race with Orchestrator state during service startup and CLI/API commands.
See Jira for details.

Modified:
    
uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceHandler.java
    
uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceManagerComponent.java
    
uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceHandler.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceHandler.java?rev=1454731&r1=1454730&r2=1454731&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceHandler.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceHandler.java
 Sat Mar  9 15:09:12 2013
@@ -21,6 +21,7 @@ package org.apache.uima.ducc.sm;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
@@ -57,15 +58,18 @@ public class ServiceHandler
     private ServiceStateHandler serviceStateHandler = new 
ServiceStateHandler();
        private ServiceMap serviceMap = new ServiceMap();       // note this is 
the sync object for publish
 
-    private HashMap<DuccId, IDuccWork> newJobs = new HashMap<DuccId, 
IDuccWork>();
-    private HashMap<DuccId, IDuccWork> newServices = new HashMap<DuccId, 
IDuccWork>();
+    private Map<DuccId, IDuccWork> newJobs = new HashMap<DuccId, IDuccWork>();
+    private Map<DuccId, IDuccWork> newServices = new HashMap<DuccId, 
IDuccWork>();
     
-    private HashMap<DuccId, IDuccWork> deletedJobs = new HashMap<DuccId, 
IDuccWork>();
-    private HashMap<DuccId, IDuccWork> deletedServices = new HashMap<DuccId, 
IDuccWork>();
-    
-    private HashMap<DuccId, IDuccWork> modifiedJobs = new HashMap<DuccId, 
IDuccWork>();
-    private HashMap<DuccId, IDuccWork> modifiedServices = new HashMap<DuccId, 
IDuccWork>();
+    private Map<DuccId, IDuccWork> deletedJobs = new HashMap<DuccId, 
IDuccWork>();
+    private Map<DuccId, IDuccWork> deletedServices = new HashMap<DuccId, 
IDuccWork>();
     
+    private Map<DuccId, IDuccWork> modifiedJobs = new HashMap<DuccId, 
IDuccWork>();
+    private Map<DuccId, IDuccWork> modifiedServices = new HashMap<DuccId, 
IDuccWork>();
+
+    private List<ApiHandler> pendingRequests = new LinkedList<ApiHandler>();
+    private Object stateUpdateLock = new Object();
+
     public ServiceHandler(IServiceManager serviceManager)
     {
         this.serviceManager = serviceManager;        
@@ -82,6 +86,7 @@ public class ServiceHandler
                        }
 
             try {
+                runCommands();           // enqueued orders that came in while 
I was away
                 processUpdates();
             } catch (Throwable t) {
                 logger.error(methodName, null, t);
@@ -106,54 +111,48 @@ public class ServiceHandler
     {
        String methodName = "processUpdates";
         logger.info(methodName, null, "Processing updates.");
-        HashMap<DuccId, IDuccWork> incoming;
-
-        incoming = new HashMap<DuccId, IDuccWork>();
-        synchronized(deletedJobs) {
-            incoming.putAll(deletedJobs);
-            deletedJobs.clear();
-        }
-        handleDeletedJobs(incoming);
-
-        incoming = new HashMap<DuccId, IDuccWork>();
-        synchronized(modifiedJobs) {
-            incoming.putAll(modifiedJobs);
-            modifiedJobs.clear();
-        }
-        handleModifiedJobs(incoming);
-
-        incoming = new HashMap<DuccId, IDuccWork>();
-        synchronized(deletedServices) {
-            incoming.putAll(deletedServices);
-            deletedServices.clear();
-        }
-        handleDeletedServices(incoming);
-
-        incoming = new HashMap<DuccId, IDuccWork>();
-        synchronized(modifiedServices) {
-            incoming.putAll(modifiedServices);
-            modifiedServices.clear();
-        }
-        handleModifiedServices(incoming);
-        handleImplicitServices();
-
-        incoming = new HashMap<DuccId, IDuccWork>();
-        synchronized(newServices) {
-            incoming.putAll(newServices);
-            newServices.clear();
+        Map<DuccId, IDuccWork> deletedJobsMap      = new HashMap<DuccId, 
IDuccWork>();
+        Map<DuccId, IDuccWork> modifiedJobsMap     = new HashMap<DuccId, 
IDuccWork>();
+        Map<DuccId, IDuccWork> newJobsMap          = new HashMap<DuccId, 
IDuccWork>();
+        Map<DuccId, IDuccWork> deletedServicesMap  = new HashMap<DuccId, 
IDuccWork>();
+        Map<DuccId, IDuccWork> modifiedServicesMap = new HashMap<DuccId, 
IDuccWork>();
+        Map<DuccId, IDuccWork> newServicesMap      = new HashMap<DuccId, 
IDuccWork>();
+
+        synchronized(stateUpdateLock) {
+                deletedJobsMap.putAll(deletedJobs);
+                deletedJobs.clear();
+                        
+                modifiedJobsMap.putAll(modifiedJobs);
+                modifiedJobs.clear();
+                       
+                deletedServicesMap.putAll(deletedServices);
+                deletedServices.clear();
+                        
+                modifiedServicesMap.putAll(modifiedServices);
+                modifiedServices.clear();
+            
+                newServicesMap.putAll(newServices);
+                newServices.clear();
+                        
+                newJobsMap.putAll(newJobs);
+                newJobs.clear();
         }
-        handleNewServices(incoming);
 
-        incoming = new HashMap<DuccId, IDuccWork>();
-        synchronized(newJobs) {
-            incoming.putAll(newJobs);
-            newJobs.clear();
-        }
-        handleNewJobs(incoming);
+        // We could potentially have several updates where a service or 
arrives, is modified, and then deleted, while
+        // we are busy.  Need to handle them in the right order.  
+        //
+        // Jobs are dependent on services but not the other way around - I 
think we need to handle services first,
+        // to avoid the case where something is dependent on something that 
will exist soon but doesn't currently.
+        handleNewServices     (newServicesMap     );
+        handleModifiedServices(modifiedServicesMap);
+        handleDeletedServices (deletedServicesMap );
+        handleImplicitServices(                   );
+
+        handleNewJobs         (newJobsMap         );
+        handleModifiedJobs    (modifiedJobsMap    );
+        handleDeletedJobs     (deletedJobsMap     );
 
-        synchronized(serviceMap) {
-            serviceManager.publish(serviceMap);
-        }
+        serviceManager.publish(serviceMap);
 
         List<ServiceSet> regsvcs = serviceStateHandler.getRegisteredServices();
         for ( ServiceSet sset : regsvcs ) {
@@ -172,35 +171,40 @@ public class ServiceHandler
                                     )
     {
 
-        synchronized(newJobs) {
-            this.newJobs.putAll(newJobs);
-        }
-
-        synchronized(newServices) {
-            this.newServices.putAll(newServices);
-        }
-
-        synchronized(deletedJobs) {
-            this.deletedJobs.putAll(deletedJobs);
-        }
-
-        synchronized(deletedServices) {
+        synchronized(stateUpdateLock) {
+            this.newJobs.putAll(newJobs);            
+            this.newServices.putAll(newServices);            
+            this.deletedJobs.putAll(deletedJobs);            
             this.deletedServices.putAll(deletedServices);
-        }
-
-        synchronized(modifiedJobs) {
             this.modifiedJobs.putAll(modifiedJobs);
-        }
-
-        synchronized(modifiedServices) {
             this.modifiedServices.putAll(modifiedServices);
         }
-      
         synchronized(this) {
             notify();
         }
     } 
-    
+
+    void runCommands()
+    {
+        String methodName = "runCommands";
+        LinkedList<ApiHandler> tmp = new LinkedList<ApiHandler>();
+        synchronized(pendingRequests) {
+            tmp.addAll(pendingRequests);
+            pendingRequests.clear();
+        }
+        logger.info(methodName, null, "Running", tmp.size(), "API Tasks.");
+        for ( ApiHandler apih : tmp ) {
+            apih.run();
+        }
+    }
+
+    void addApiTask(ApiHandler apih)
+    {
+        synchronized(pendingRequests) {
+            pendingRequests.add(apih);
+        }
+    }
+
     /**
      * Resolves state for the job in id based on the what it is dependent upon 
- the independent services
      */
@@ -295,7 +299,7 @@ public class ServiceHandler
         return jobServices;
     }
  
-    protected void handleNewJobs(HashMap<DuccId, IDuccWork> work)
+    protected void handleNewJobs(Map<DuccId, IDuccWork> work)
     { 
         String methodName = "handleNewJobs";
 
@@ -380,7 +384,7 @@ public class ServiceHandler
         serviceStateHandler.removeServicesForJob(id);            
     }
 
-    protected void handleDeletedJobs(HashMap<DuccId, IDuccWork> work)
+    protected void handleDeletedJobs(Map<DuccId, IDuccWork> work)
     {
         String methodName = "handleCompletedJobs";
 
@@ -401,7 +405,7 @@ public class ServiceHandler
         serviceMap.removeAll(work.keySet());
     }
 
-    protected void handleModifiedJobs(HashMap<DuccId, IDuccWork> work)
+    protected void handleModifiedJobs(Map<DuccId, IDuccWork> work)
     {
         String methodName = "handleModifiedJobs";
 
@@ -433,7 +437,7 @@ public class ServiceHandler
 
     }
 
-    protected void handleNewServices(HashMap<DuccId, IDuccWork> work)
+    protected void handleNewServices(Map<DuccId, IDuccWork> work)
     {
         String methodName = "handleNewServices";
 
@@ -544,7 +548,7 @@ public class ServiceHandler
     // We're here because we got OR state for the service that it has stopped 
running.
     // Must clean up.
     //
-    protected void handleDeletedServices(HashMap<DuccId, IDuccWork> work)
+    protected void handleDeletedServices(Map<DuccId, IDuccWork> work)
     {
         String methodName = "handleDeletedServices";
 
@@ -595,7 +599,7 @@ public class ServiceHandler
         }
     }
 
-    protected void handleModifiedServices(HashMap<DuccId, IDuccWork> work)
+    protected void handleModifiedServices(Map<DuccId, IDuccWork> work)
     {
         String methodName = "handleModifiedServices";        
         
@@ -753,10 +757,13 @@ public class ServiceHandler
                                              sset.getId());
             }
 
-            // only start something if we don't have enought already going
-            ApiHandler  apih = new ApiHandler(ev, this);
-            Thread t = new Thread(apih);
-            t.start();
+            pendingRequests.add(new ApiHandler(ev, this));
+
+//             // only start something if we don't have enought already going
+//             ApiHandler  apih = new ApiHandler(ev, this);
+//             Thread t = new Thread(apih);
+//             t.start();
+
             return new ServiceReplyEvent(ServiceCode.OK, 
                                          "Service " + serviceIdString + " 
start request accepted, new instances[" + wanted + "]", 
                                          sset.getKey(), 
@@ -833,9 +840,10 @@ public class ServiceHandler
             
 
             if ( tolose > 0 ) {
-                ApiHandler  apih = new ApiHandler(ev, this);
-                Thread t = new Thread(apih);
-                t.start();
+                pendingRequests.add(new ApiHandler(ev, this));
+//                 ApiHandler  apih = new ApiHandler(ev, this);
+//                 Thread t = new Thread(apih);
+//                 t.start();
             }
 
             return new ServiceReplyEvent(ServiceCode.OK, "Service " + 
serviceIdString + " stop request accepted for [" + tolose + "] instances.", 
sset.getKey(), sset.getId());
@@ -951,10 +959,10 @@ public class ServiceHandler
         }
 
        if ( sset.isRegistered() ) {            
-            ApiHandler  apih = new ApiHandler(ev, this);
-
-            Thread t = new Thread(apih);
-            t.start();
+            pendingRequests.add(new ApiHandler(ev, this));
+//             ApiHandler  apih = new ApiHandler(ev, this);
+//             Thread t = new Thread(apih);
+//             t.start();
             return new ServiceReplyEvent(ServiceCode.OK, "Service " + 
serviceIdString + " modify request accepted.", sset.getKey(), sset.getId());
         } else {
             return new ServiceReplyEvent(ServiceCode.NOTOK, "Service " + 
friendly + " is not a known service.", sset.getKey(), null);            
@@ -1005,9 +1013,10 @@ public class ServiceHandler
 
         if ( sset.isRegistered() ) {            
             sset.deregister();          // just sets a flag so we know how to 
handle it when it starts to die
-            ApiHandler  apih = new ApiHandler(ev, this);
-            Thread t = new Thread(apih);
-            t.start();
+            pendingRequests.add(new ApiHandler(ev, this));
+//             ApiHandler  apih = new ApiHandler(ev, this);
+//             Thread t = new Thread(apih);
+//             t.start();
             return new ServiceReplyEvent(ServiceCode.OK, "Service " + 
serviceIdString + " unregistered. Shutting down implementors.", sset.getKey(), 
sset.getId());
         } else {
             return new ServiceReplyEvent(ServiceCode.NOTOK, "Service " + 
serviceIdString + " is not a registered service.", sset.getKey(), null);        
    
@@ -1184,12 +1193,12 @@ public class ServiceHandler
     {
 
         // Map of active service descriptors by endpoint.  For UIMA services, 
key is the endpoint.
-        private HashMap<String,  ServiceSet>  servicesByName     = new 
HashMap<String,  ServiceSet>();
-        private HashMap<Long,    ServiceSet>  servicesByFriendly = new 
HashMap<Long,    ServiceSet>();
+        private Map<String,  ServiceSet>  servicesByName     = new 
HashMap<String,  ServiceSet>();
+        private Map<Long,    ServiceSet>  servicesByFriendly = new 
HashMap<Long,    ServiceSet>();
 
         // For each job, the collection of services it is dependent upon
         // DUccId is a Job Id (or id for serice that has dependencies)
-        private HashMap<DuccId, Map<String, ServiceSet>>  servicesByJob = new 
HashMap<DuccId, Map<String, ServiceSet>>();
+        private Map<DuccId, Map<String, ServiceSet>>  servicesByJob = new 
HashMap<DuccId, Map<String, ServiceSet>>();
 
         ServiceStateHandler()
         {

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceManagerComponent.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceManagerComponent.java?rev=1454731&r1=1454730&r2=1454731&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceManagerComponent.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceManagerComponent.java
 Sat Mar  9 15:09:12 2013
@@ -98,6 +98,8 @@ public class ServiceManagerComponent 
     private String service_seqno = "service.seqno";
     private DuccIdFactory idFactory = new DuccIdFactory();
 
+    private boolean initialized = false;
+
        public ServiceManagerComponent(CamelContext context) 
     {
                super("ServiceManager", context);
@@ -212,6 +214,10 @@ public class ServiceManagerComponent 
         } 
 
         idFactory = new DuccIdFactory(seq);
+
+        synchronized(this) {
+            initialized = true;
+        }
     }
        
        @Override
@@ -327,6 +333,7 @@ public class ServiceManagerComponent 
             if ( w.getDuccType() == DuccType.Reservation ) continue;
 
             if ( !((DuccWorkJob)w).isActive() ) continue;         // not 
active, we don't care about it. likely after restart.
+
             logger.debug(methodName, w.getDuccId(), "Reconciling, adding", 
w.getDuccType());
                        switch(w.getDuccType()) {
               case Job:
@@ -337,8 +344,8 @@ public class ServiceManagerComponent 
               case Service:
                   localMap.addDuccWork(w);
                   // An arbitrary process is **almost** the same as a service 
in terms of how most of DUCC
-                  // handles it.  In order to transparently reuse all that 
code it is classified as a
-                  // special type of service, "other", which the SM treats as 
a regular job.
+                  // handles it.  To me (SM), however, it is just like any 
other job so it goes into
+                  // the job map.
                   switch ( ((IDuccWorkService)w).getServiceDeploymentType() ) 
                   {
                       case uima:
@@ -357,7 +364,7 @@ public class ServiceManagerComponent 
             }
         }
 
-        // Stuff on the left is stuff we have but or doesn't
+        // Stuff on the right is stuff we have but OR doesn't
         work = diffmap.getRight();
         for ( IDuccWork w : work.values() ) {
             if ( w.getDuccType() == DuccType.Reservation ) continue;
@@ -388,34 +395,54 @@ public class ServiceManagerComponent 
             }
         }
 
-        // Now: stuff we both know about
+        // Now: stuff we both know about.  Stuff on left is incoming.
         for( DuccMapValueDifference<IDuccWork> jd: diffmap ) {
 
-            IDuccWork r = jd.getRight();
             IDuccWork l = jd.getLeft();
+            IDuccWork r = jd.getRight();
 
             if ( l.getDuccType() == DuccType.Reservation ) continue;
 
             logger.debug(methodName, l.getDuccId(), "Reconciling, incoming 
state = ", l.getStateObject(), " my state = ", r.getStateObject());
+            boolean active = ((DuccWorkJob)l).isActive();
 
             // Update our own state by replacing the old (right) object with 
the new (left)
                        switch(l.getDuccType()) {
               case Job:
-                  modifiedJobs.put(l.getDuccId(), l);
-                  localMap.addDuccWork(l);
+                  if ( active ) {
+                      modifiedJobs.put(l.getDuccId(), l);
+                      localMap.addDuccWork(l);
+                  } else {
+                      deletedJobs.put(l.getDuccId(), l);
+                      localMap.removeDuccWork(l.getDuccId());
+                  }
                   break;
 
               case Service:
-                  localMap.addDuccWork(l);
-                  switch ( ((IDuccWorkService)l).getServiceDeploymentType() ) 
-                  {
-                      case uima:
-                      case custom:
-                          modifiedServices.put(l.getDuccId(), l);
-                          break;
-                      case other:
-                          modifiedJobs.put(l.getDuccId(), l);
-                          break;
+                  if ( active ) {
+                      localMap.addDuccWork(l);
+                      switch ( 
((IDuccWorkService)l).getServiceDeploymentType() ) 
+                          {
+                          case uima:
+                          case custom:
+                              modifiedServices.put(l.getDuccId(), l);
+                              break;
+                          case other:
+                              modifiedJobs.put(l.getDuccId(), l);
+                              break;
+                          }
+                  } else {
+                      localMap.removeDuccWork(l.getDuccId());
+                      switch ( 
((IDuccWorkService)l).getServiceDeploymentType() ) 
+                          {
+                          case uima:
+                          case custom:
+                              deletedServices.put(l.getDuccId(), l);
+                              break;
+                          case other:
+                              deletedJobs.put(l.getDuccId(), l);
+                              break;
+                          }
                   }
                   break;
 
@@ -658,6 +685,12 @@ public class ServiceManagerComponent 
 
     public synchronized void orchestratorStateArrives(DuccWorkMap map)
     {
+       String methodName = "orchestratorStateArrives";
+        if ( ! initialized ) {
+            logger.info(methodName, null, "SM not initialized, ignoring 
Orchestrator message.");
+            return;
+        }
+
         epochCounter++;
         incomingMap = map;
         notify();

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java?rev=1454731&r1=1454730&r2=1454731&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java
 Sat Mar  9 15:09:12 2013
@@ -71,7 +71,7 @@ public class ServiceSet
     // key is job/service id, value is same.  it's a map for fast existence 
check
     HashMap<DuccId, DuccId> references = new HashMap<DuccId, DuccId>();
 
-    // For a registered service, here is my registered it
+    // For a registered service, here is my registered id
     DuccId id;
     HashMap<Long, DuccId> friendly_ids = new HashMap<Long, DuccId>();
 
@@ -416,14 +416,17 @@ public class ServiceSet
      */
     void enforceAutostart()
     {
+       String methodName = "enforceAutostart";
         if ( ! autostart ) return;                   // not doing auto, 
nothing to do
         if ( stopped     ) return;                   // doing auto, but we've 
been manually stopped
 
         // could have more implementors than instances if some were started 
dynamically but the count not persisted
-        int needed = Math.max(0, instances - implementors.size());
+        int needed = Math.max(0, instances - friendly_ids.size());
 
+        logger.debug(methodName, null, "ENFORCE: ", needed);
         while ( (needed--) > 0 ) {
             start();
+            logger.debug(methodName, null, "ENFORCE: ", needed);
         }
     }
 
@@ -1064,6 +1067,7 @@ public class ServiceSet
     {
        String methodName = "start";
 
+        logger.debug(methodName, null, "START START START START START START 
START START");
 //         if ( service_type == ServiceType.Custom ) { 
 //             establish();
 //             return;
@@ -1087,7 +1091,13 @@ public class ServiceSet
         };
             
         for ( int i = 0; i < args.length; i++ ) { 
-            logger.debug(methodName, null, "Args[", i, "]:", args[i]);
+            if ( i > 0 && (args[i-1].equals("-cp") ) ) {
+                // The classpaths can be just awful filling the logs with 
junk.  It will end up in the agent log
+                // anyway so let's inhibit it here.
+                logger.debug(methodName, null, "Args[", i, "]: <CLASSPATH>");
+            } else {
+                logger.debug(methodName, null, "Args[", i, "]:", args[i]);
+            }
         }
 
         ProcessBuilder pb = new ProcessBuilder(args);
@@ -1124,8 +1134,19 @@ public class ServiceSet
                }
 
         // That was annoying.  Now search the lines for some hint of the id.
+        boolean inhibit_cp = false;
         for ( String s : stdout_lines ) {
-            logger.debug(methodName, id, "Start stdout:", s);
+            if ( inhibit_cp ) {
+                // The classpaths can be just awful filling the logs with 
junk.  It will end up in the agent log
+                // anyway so let's inhibit it here.
+                logger.debug(methodName, id, "<INHIBITED CP>");
+                inhibit_cp = false;
+            } else {
+                logger.debug(methodName, id, "Start stdout:", s);
+                if ( s.indexOf("-cp") >= 0 ) {
+                    inhibit_cp = true;
+                }
+            }
         }
 
         for ( String s : stderr_lines ) {
@@ -1160,6 +1181,7 @@ public class ServiceSet
             setServiceState(ServiceState.Initializing);
         }
         saveMetaProperties();
+        logger.debug(methodName, null, "ENDSTART ENDSTART ENDSTART ENDSTART 
ENDSTART ENDSTART");
     }
 
     /**


Reply via email to