Author: challngr Date: Sat Mar 9 15:09:12 2013 New Revision: 1454731 URL: http://svn.apache.org/r1454731 Log: UIMA-2728 Fix race with Orchestrator state during service startup and CLI/API commands. See Jira for details.
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceHandler.java uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceManagerComponent.java uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceHandler.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceHandler.java?rev=1454731&r1=1454730&r2=1454731&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceHandler.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceHandler.java Sat Mar 9 15:09:12 2013 @@ -21,6 +21,7 @@ package org.apache.uima.ducc.sm; import java.io.File; import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -57,15 +58,18 @@ public class ServiceHandler private ServiceStateHandler serviceStateHandler = new ServiceStateHandler(); private ServiceMap serviceMap = new ServiceMap(); // note this is the sync object for publish - private HashMap<DuccId, IDuccWork> newJobs = new HashMap<DuccId, IDuccWork>(); - private HashMap<DuccId, IDuccWork> newServices = new HashMap<DuccId, IDuccWork>(); + private Map<DuccId, IDuccWork> newJobs = new HashMap<DuccId, IDuccWork>(); + private Map<DuccId, IDuccWork> newServices = new HashMap<DuccId, IDuccWork>(); - private HashMap<DuccId, IDuccWork> deletedJobs = new HashMap<DuccId, IDuccWork>(); - private HashMap<DuccId, IDuccWork> deletedServices = new HashMap<DuccId, IDuccWork>(); - - private HashMap<DuccId, IDuccWork> modifiedJobs = new HashMap<DuccId, IDuccWork>(); - private HashMap<DuccId, IDuccWork> modifiedServices = new HashMap<DuccId, IDuccWork>(); + private Map<DuccId, IDuccWork> deletedJobs = new HashMap<DuccId, IDuccWork>(); + private Map<DuccId, IDuccWork> deletedServices = new HashMap<DuccId, IDuccWork>(); + private Map<DuccId, IDuccWork> modifiedJobs = new HashMap<DuccId, IDuccWork>(); + private Map<DuccId, IDuccWork> modifiedServices = new HashMap<DuccId, IDuccWork>(); + + private List<ApiHandler> pendingRequests = new LinkedList<ApiHandler>(); + private Object stateUpdateLock = new Object(); + public ServiceHandler(IServiceManager serviceManager) { this.serviceManager = serviceManager; @@ -82,6 +86,7 @@ public class ServiceHandler } try { + runCommands(); // enqueued orders that came in while I was away processUpdates(); } catch (Throwable t) { logger.error(methodName, null, t); @@ -106,54 +111,48 @@ public class ServiceHandler { String methodName = "processUpdates"; logger.info(methodName, null, "Processing updates."); - HashMap<DuccId, IDuccWork> incoming; - - incoming = new HashMap<DuccId, IDuccWork>(); - synchronized(deletedJobs) { - incoming.putAll(deletedJobs); - deletedJobs.clear(); - } - handleDeletedJobs(incoming); - - incoming = new HashMap<DuccId, IDuccWork>(); - synchronized(modifiedJobs) { - incoming.putAll(modifiedJobs); - modifiedJobs.clear(); - } - handleModifiedJobs(incoming); - - incoming = new HashMap<DuccId, IDuccWork>(); - synchronized(deletedServices) { - incoming.putAll(deletedServices); - deletedServices.clear(); - } - handleDeletedServices(incoming); - - incoming = new HashMap<DuccId, IDuccWork>(); - synchronized(modifiedServices) { - incoming.putAll(modifiedServices); - modifiedServices.clear(); - } - handleModifiedServices(incoming); - handleImplicitServices(); - - incoming = new HashMap<DuccId, IDuccWork>(); - synchronized(newServices) { - incoming.putAll(newServices); - newServices.clear(); + Map<DuccId, IDuccWork> deletedJobsMap = new HashMap<DuccId, IDuccWork>(); + Map<DuccId, IDuccWork> modifiedJobsMap = new HashMap<DuccId, IDuccWork>(); + Map<DuccId, IDuccWork> newJobsMap = new HashMap<DuccId, IDuccWork>(); + Map<DuccId, IDuccWork> deletedServicesMap = new HashMap<DuccId, IDuccWork>(); + Map<DuccId, IDuccWork> modifiedServicesMap = new HashMap<DuccId, IDuccWork>(); + Map<DuccId, IDuccWork> newServicesMap = new HashMap<DuccId, IDuccWork>(); + + synchronized(stateUpdateLock) { + deletedJobsMap.putAll(deletedJobs); + deletedJobs.clear(); + + modifiedJobsMap.putAll(modifiedJobs); + modifiedJobs.clear(); + + deletedServicesMap.putAll(deletedServices); + deletedServices.clear(); + + modifiedServicesMap.putAll(modifiedServices); + modifiedServices.clear(); + + newServicesMap.putAll(newServices); + newServices.clear(); + + newJobsMap.putAll(newJobs); + newJobs.clear(); } - handleNewServices(incoming); - incoming = new HashMap<DuccId, IDuccWork>(); - synchronized(newJobs) { - incoming.putAll(newJobs); - newJobs.clear(); - } - handleNewJobs(incoming); + // We could potentially have several updates where a service or arrives, is modified, and then deleted, while + // we are busy. Need to handle them in the right order. + // + // Jobs are dependent on services but not the other way around - I think we need to handle services first, + // to avoid the case where something is dependent on something that will exist soon but doesn't currently. + handleNewServices (newServicesMap ); + handleModifiedServices(modifiedServicesMap); + handleDeletedServices (deletedServicesMap ); + handleImplicitServices( ); + + handleNewJobs (newJobsMap ); + handleModifiedJobs (modifiedJobsMap ); + handleDeletedJobs (deletedJobsMap ); - synchronized(serviceMap) { - serviceManager.publish(serviceMap); - } + serviceManager.publish(serviceMap); List<ServiceSet> regsvcs = serviceStateHandler.getRegisteredServices(); for ( ServiceSet sset : regsvcs ) { @@ -172,35 +171,40 @@ public class ServiceHandler ) { - synchronized(newJobs) { - this.newJobs.putAll(newJobs); - } - - synchronized(newServices) { - this.newServices.putAll(newServices); - } - - synchronized(deletedJobs) { - this.deletedJobs.putAll(deletedJobs); - } - - synchronized(deletedServices) { + synchronized(stateUpdateLock) { + this.newJobs.putAll(newJobs); + this.newServices.putAll(newServices); + this.deletedJobs.putAll(deletedJobs); this.deletedServices.putAll(deletedServices); - } - - synchronized(modifiedJobs) { this.modifiedJobs.putAll(modifiedJobs); - } - - synchronized(modifiedServices) { this.modifiedServices.putAll(modifiedServices); } - synchronized(this) { notify(); } } - + + void runCommands() + { + String methodName = "runCommands"; + LinkedList<ApiHandler> tmp = new LinkedList<ApiHandler>(); + synchronized(pendingRequests) { + tmp.addAll(pendingRequests); + pendingRequests.clear(); + } + logger.info(methodName, null, "Running", tmp.size(), "API Tasks."); + for ( ApiHandler apih : tmp ) { + apih.run(); + } + } + + void addApiTask(ApiHandler apih) + { + synchronized(pendingRequests) { + pendingRequests.add(apih); + } + } + /** * Resolves state for the job in id based on the what it is dependent upon - the independent services */ @@ -295,7 +299,7 @@ public class ServiceHandler return jobServices; } - protected void handleNewJobs(HashMap<DuccId, IDuccWork> work) + protected void handleNewJobs(Map<DuccId, IDuccWork> work) { String methodName = "handleNewJobs"; @@ -380,7 +384,7 @@ public class ServiceHandler serviceStateHandler.removeServicesForJob(id); } - protected void handleDeletedJobs(HashMap<DuccId, IDuccWork> work) + protected void handleDeletedJobs(Map<DuccId, IDuccWork> work) { String methodName = "handleCompletedJobs"; @@ -401,7 +405,7 @@ public class ServiceHandler serviceMap.removeAll(work.keySet()); } - protected void handleModifiedJobs(HashMap<DuccId, IDuccWork> work) + protected void handleModifiedJobs(Map<DuccId, IDuccWork> work) { String methodName = "handleModifiedJobs"; @@ -433,7 +437,7 @@ public class ServiceHandler } - protected void handleNewServices(HashMap<DuccId, IDuccWork> work) + protected void handleNewServices(Map<DuccId, IDuccWork> work) { String methodName = "handleNewServices"; @@ -544,7 +548,7 @@ public class ServiceHandler // We're here because we got OR state for the service that it has stopped running. // Must clean up. // - protected void handleDeletedServices(HashMap<DuccId, IDuccWork> work) + protected void handleDeletedServices(Map<DuccId, IDuccWork> work) { String methodName = "handleDeletedServices"; @@ -595,7 +599,7 @@ public class ServiceHandler } } - protected void handleModifiedServices(HashMap<DuccId, IDuccWork> work) + protected void handleModifiedServices(Map<DuccId, IDuccWork> work) { String methodName = "handleModifiedServices"; @@ -753,10 +757,13 @@ public class ServiceHandler sset.getId()); } - // only start something if we don't have enought already going - ApiHandler apih = new ApiHandler(ev, this); - Thread t = new Thread(apih); - t.start(); + pendingRequests.add(new ApiHandler(ev, this)); + +// // only start something if we don't have enought already going +// ApiHandler apih = new ApiHandler(ev, this); +// Thread t = new Thread(apih); +// t.start(); + return new ServiceReplyEvent(ServiceCode.OK, "Service " + serviceIdString + " start request accepted, new instances[" + wanted + "]", sset.getKey(), @@ -833,9 +840,10 @@ public class ServiceHandler if ( tolose > 0 ) { - ApiHandler apih = new ApiHandler(ev, this); - Thread t = new Thread(apih); - t.start(); + pendingRequests.add(new ApiHandler(ev, this)); +// ApiHandler apih = new ApiHandler(ev, this); +// Thread t = new Thread(apih); +// t.start(); } return new ServiceReplyEvent(ServiceCode.OK, "Service " + serviceIdString + " stop request accepted for [" + tolose + "] instances.", sset.getKey(), sset.getId()); @@ -951,10 +959,10 @@ public class ServiceHandler } if ( sset.isRegistered() ) { - ApiHandler apih = new ApiHandler(ev, this); - - Thread t = new Thread(apih); - t.start(); + pendingRequests.add(new ApiHandler(ev, this)); +// ApiHandler apih = new ApiHandler(ev, this); +// Thread t = new Thread(apih); +// t.start(); return new ServiceReplyEvent(ServiceCode.OK, "Service " + serviceIdString + " modify request accepted.", sset.getKey(), sset.getId()); } else { return new ServiceReplyEvent(ServiceCode.NOTOK, "Service " + friendly + " is not a known service.", sset.getKey(), null); @@ -1005,9 +1013,10 @@ public class ServiceHandler if ( sset.isRegistered() ) { sset.deregister(); // just sets a flag so we know how to handle it when it starts to die - ApiHandler apih = new ApiHandler(ev, this); - Thread t = new Thread(apih); - t.start(); + pendingRequests.add(new ApiHandler(ev, this)); +// ApiHandler apih = new ApiHandler(ev, this); +// Thread t = new Thread(apih); +// t.start(); return new ServiceReplyEvent(ServiceCode.OK, "Service " + serviceIdString + " unregistered. Shutting down implementors.", sset.getKey(), sset.getId()); } else { return new ServiceReplyEvent(ServiceCode.NOTOK, "Service " + serviceIdString + " is not a registered service.", sset.getKey(), null); @@ -1184,12 +1193,12 @@ public class ServiceHandler { // Map of active service descriptors by endpoint. For UIMA services, key is the endpoint. - private HashMap<String, ServiceSet> servicesByName = new HashMap<String, ServiceSet>(); - private HashMap<Long, ServiceSet> servicesByFriendly = new HashMap<Long, ServiceSet>(); + private Map<String, ServiceSet> servicesByName = new HashMap<String, ServiceSet>(); + private Map<Long, ServiceSet> servicesByFriendly = new HashMap<Long, ServiceSet>(); // For each job, the collection of services it is dependent upon // DUccId is a Job Id (or id for serice that has dependencies) - private HashMap<DuccId, Map<String, ServiceSet>> servicesByJob = new HashMap<DuccId, Map<String, ServiceSet>>(); + private Map<DuccId, Map<String, ServiceSet>> servicesByJob = new HashMap<DuccId, Map<String, ServiceSet>>(); ServiceStateHandler() { Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceManagerComponent.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceManagerComponent.java?rev=1454731&r1=1454730&r2=1454731&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceManagerComponent.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceManagerComponent.java Sat Mar 9 15:09:12 2013 @@ -98,6 +98,8 @@ public class ServiceManagerComponent private String service_seqno = "service.seqno"; private DuccIdFactory idFactory = new DuccIdFactory(); + private boolean initialized = false; + public ServiceManagerComponent(CamelContext context) { super("ServiceManager", context); @@ -212,6 +214,10 @@ public class ServiceManagerComponent } idFactory = new DuccIdFactory(seq); + + synchronized(this) { + initialized = true; + } } @Override @@ -327,6 +333,7 @@ public class ServiceManagerComponent if ( w.getDuccType() == DuccType.Reservation ) continue; if ( !((DuccWorkJob)w).isActive() ) continue; // not active, we don't care about it. likely after restart. + logger.debug(methodName, w.getDuccId(), "Reconciling, adding", w.getDuccType()); switch(w.getDuccType()) { case Job: @@ -337,8 +344,8 @@ public class ServiceManagerComponent case Service: localMap.addDuccWork(w); // An arbitrary process is **almost** the same as a service in terms of how most of DUCC - // handles it. In order to transparently reuse all that code it is classified as a - // special type of service, "other", which the SM treats as a regular job. + // handles it. To me (SM), however, it is just like any other job so it goes into + // the job map. switch ( ((IDuccWorkService)w).getServiceDeploymentType() ) { case uima: @@ -357,7 +364,7 @@ public class ServiceManagerComponent } } - // Stuff on the left is stuff we have but or doesn't + // Stuff on the right is stuff we have but OR doesn't work = diffmap.getRight(); for ( IDuccWork w : work.values() ) { if ( w.getDuccType() == DuccType.Reservation ) continue; @@ -388,34 +395,54 @@ public class ServiceManagerComponent } } - // Now: stuff we both know about + // Now: stuff we both know about. Stuff on left is incoming. for( DuccMapValueDifference<IDuccWork> jd: diffmap ) { - IDuccWork r = jd.getRight(); IDuccWork l = jd.getLeft(); + IDuccWork r = jd.getRight(); if ( l.getDuccType() == DuccType.Reservation ) continue; logger.debug(methodName, l.getDuccId(), "Reconciling, incoming state = ", l.getStateObject(), " my state = ", r.getStateObject()); + boolean active = ((DuccWorkJob)l).isActive(); // Update our own state by replacing the old (right) object with the new (left) switch(l.getDuccType()) { case Job: - modifiedJobs.put(l.getDuccId(), l); - localMap.addDuccWork(l); + if ( active ) { + modifiedJobs.put(l.getDuccId(), l); + localMap.addDuccWork(l); + } else { + deletedJobs.put(l.getDuccId(), l); + localMap.removeDuccWork(l.getDuccId()); + } break; case Service: - localMap.addDuccWork(l); - switch ( ((IDuccWorkService)l).getServiceDeploymentType() ) - { - case uima: - case custom: - modifiedServices.put(l.getDuccId(), l); - break; - case other: - modifiedJobs.put(l.getDuccId(), l); - break; + if ( active ) { + localMap.addDuccWork(l); + switch ( ((IDuccWorkService)l).getServiceDeploymentType() ) + { + case uima: + case custom: + modifiedServices.put(l.getDuccId(), l); + break; + case other: + modifiedJobs.put(l.getDuccId(), l); + break; + } + } else { + localMap.removeDuccWork(l.getDuccId()); + switch ( ((IDuccWorkService)l).getServiceDeploymentType() ) + { + case uima: + case custom: + deletedServices.put(l.getDuccId(), l); + break; + case other: + deletedJobs.put(l.getDuccId(), l); + break; + } } break; @@ -658,6 +685,12 @@ public class ServiceManagerComponent public synchronized void orchestratorStateArrives(DuccWorkMap map) { + String methodName = "orchestratorStateArrives"; + if ( ! initialized ) { + logger.info(methodName, null, "SM not initialized, ignoring Orchestrator message."); + return; + } + epochCounter++; incomingMap = map; notify(); Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java?rev=1454731&r1=1454730&r2=1454731&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java Sat Mar 9 15:09:12 2013 @@ -71,7 +71,7 @@ public class ServiceSet // key is job/service id, value is same. it's a map for fast existence check HashMap<DuccId, DuccId> references = new HashMap<DuccId, DuccId>(); - // For a registered service, here is my registered it + // For a registered service, here is my registered id DuccId id; HashMap<Long, DuccId> friendly_ids = new HashMap<Long, DuccId>(); @@ -416,14 +416,17 @@ public class ServiceSet */ void enforceAutostart() { + String methodName = "enforceAutostart"; if ( ! autostart ) return; // not doing auto, nothing to do if ( stopped ) return; // doing auto, but we've been manually stopped // could have more implementors than instances if some were started dynamically but the count not persisted - int needed = Math.max(0, instances - implementors.size()); + int needed = Math.max(0, instances - friendly_ids.size()); + logger.debug(methodName, null, "ENFORCE: ", needed); while ( (needed--) > 0 ) { start(); + logger.debug(methodName, null, "ENFORCE: ", needed); } } @@ -1064,6 +1067,7 @@ public class ServiceSet { String methodName = "start"; + logger.debug(methodName, null, "START START START START START START START START"); // if ( service_type == ServiceType.Custom ) { // establish(); // return; @@ -1087,7 +1091,13 @@ public class ServiceSet }; for ( int i = 0; i < args.length; i++ ) { - logger.debug(methodName, null, "Args[", i, "]:", args[i]); + if ( i > 0 && (args[i-1].equals("-cp") ) ) { + // The classpaths can be just awful filling the logs with junk. It will end up in the agent log + // anyway so let's inhibit it here. + logger.debug(methodName, null, "Args[", i, "]: <CLASSPATH>"); + } else { + logger.debug(methodName, null, "Args[", i, "]:", args[i]); + } } ProcessBuilder pb = new ProcessBuilder(args); @@ -1124,8 +1134,19 @@ public class ServiceSet } // That was annoying. Now search the lines for some hint of the id. + boolean inhibit_cp = false; for ( String s : stdout_lines ) { - logger.debug(methodName, id, "Start stdout:", s); + if ( inhibit_cp ) { + // The classpaths can be just awful filling the logs with junk. It will end up in the agent log + // anyway so let's inhibit it here. + logger.debug(methodName, id, "<INHIBITED CP>"); + inhibit_cp = false; + } else { + logger.debug(methodName, id, "Start stdout:", s); + if ( s.indexOf("-cp") >= 0 ) { + inhibit_cp = true; + } + } } for ( String s : stderr_lines ) { @@ -1160,6 +1181,7 @@ public class ServiceSet setServiceState(ServiceState.Initializing); } saveMetaProperties(); + logger.debug(methodName, null, "ENDSTART ENDSTART ENDSTART ENDSTART ENDSTART ENDSTART"); } /**