Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbLoader.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbLoader.java?rev=1708149&r1=1708148&r2=1708149&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbLoader.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbLoader.java Mon Oct 12 16:10:55 2015 @@ -79,9 +79,14 @@ public class DbLoader String archive_key = IStateServices.archive_key; String archive_flag = IStateServices.archive_flag; - int nthreads = 40; + int nthreads = 20; AtomicInteger counter = new AtomicInteger(0); + //int joblimit = 10000; + //int reservationlimit = 10000; + //int servicelimit = 10000; + //int registrylimit = 10000; + int joblimit = Integer.MAX_VALUE; int reservationlimit = Integer.MAX_VALUE; int servicelimit = Integer.MAX_VALUE; @@ -111,6 +116,13 @@ public class DbLoader System.exit(1); } + jobHistory = from + jobHistory; + reservationHistory = from + reservationHistory; + serviceHistory = from + serviceHistory; + serviceRegistryHistory = from + serviceRegistryHistory; + serviceRegistry = from + serviceRegistry; + checkpointFile = from + checkpointFile; + f = new File(to); if ( ! f.isDirectory() ) { System.out.println("'to' must be a directory"); @@ -120,39 +132,26 @@ public class DbLoader String databasedir = to + "/database/databases"; String databasename = databasedir + "/DuccState"; // We always use a non-networked version for loading - state_url = "plocal:" + databasedir + "/DuccState"; + //state_url = "plocal:" + databasedir + "/DuccState"; + state_url = "remote:bluej538/DuccState"; System.setProperty("ducc.state.database.url", state_url); - f = new File(databasedir); - if ( f.exists() ) { - f = new File(databasename); - if ( f.exists() ) { - logger.info(methodName, null, "Dropping existing database."); - DbManager dbm = new DbManager(state_url, logger); - dbm.init(); - dbm.drop(); - dbm.shutdown(); - } - } else { - try { - if ( ! f.mkdirs() ) { - System.out.println("Cannot create database directory: " + databasedir); + if ( state_url.startsWith("plocal") ) { + f = new File(databasedir); + if ( !f.exists() ) { + try { + if ( ! f.mkdirs() ) { + System.out.println("Cannot create database directory: " + databasedir); + System.exit(1); + } + System.out.println("Created database directory " + databasedir); + } catch ( Exception e ) { + System.out.println("Cannot create database directory: " + databasedir + ":" + e.toString()); System.exit(1); } - System.out.println("Created database directory " + databasedir); - } catch ( Exception e ) { - System.out.println("Cannot create database directory: " + databasedir + ":" + e.toString()); - System.exit(1); } } - - jobHistory = from + jobHistory; - reservationHistory = from + reservationHistory; - serviceHistory = from + serviceHistory; - serviceRegistryHistory = from + serviceRegistryHistory; - serviceRegistry = from + serviceRegistry; - checkpointFile = from + checkpointFile; } void closeStream(InputStream in) @@ -164,6 +163,7 @@ public class DbLoader { String methodName = "loadJobs"; + logger.info(methodName, null, " -------------------- Load jobs ----------------"); File dir = new File(jobHistory); if ( !dir.isDirectory() ) { logger.info(methodName, null, "Cannot find job history; skipping load of jobs."); @@ -237,6 +237,7 @@ public class DbLoader { String methodName = "loadReservations"; + logger.info(methodName, null, " -------------------- Load reservations ----------------"); File dir = new File(reservationHistory); if ( ! dir.isDirectory() ) { logger.info(methodName, null, "No reservation directory found; skipping database load of reservations."); @@ -310,6 +311,8 @@ public class DbLoader public void loadServices() { String methodName = "loadServices"; + + logger.info(methodName, null, " -------------------- Load services ----------------"); File dir = new File(serviceHistory); if ( ! dir.isDirectory() ) { logger.info(methodName, null, "No service history directory found; skipping load of service history."); @@ -383,6 +386,8 @@ public class DbLoader { String methodName = "loadServiceRegistry"; + logger.info(methodName, null, " -------------------- Load registry; isHistory", isHistory, " ----------------"); + int c = 0; File dir = new File(registry); File[] files = dir.listFiles(); @@ -503,38 +508,60 @@ public class DbLoader throws Exception { String methodName = "run"; + long now = System.currentTimeMillis(); + + DbManager dbm = new DbManager(state_url, logger); + if ( dbm.checkForDatabase() ) { + dbm.init(); + dbm.drop(); + dbm.shutdown(); + } DbCreate cr = new DbCreate(state_url, logger); - cr.createPlocalDatabase(); + if ( state_url.startsWith("plocal") ) { + cr.createPlocalDatabase(); + } else { + cr.createDatabase(); + } logger.info(methodName, null, "storage.useWAL", System.getProperty("storage.useWAL")); logger.info(methodName, null, "tx.useLog", System.getProperty("tx.useLog")); if ( true ) { try { + OGlobalConfiguration.USE_WAL.setValue(false); + OGlobalConfiguration.USE_LOG.setValue(false); + OGlobalConfiguration.dumpConfiguration(System.out); hmd = new HistoryManagerDb(logger); + long nowt = System.currentTimeMillis(); if ( docheckpoint ) loadCheckpoint(); - - OGlobalConfiguration.USE_WAL.setValue(false); + logger.info(methodName, null, "***** Time to load checkpoint A ****", System.currentTimeMillis() - nowt); OGlobalConfiguration.dumpConfiguration(System.out); // ---------- Load job history + nowt = System.currentTimeMillis(); if ( dojobs ) loadJobs(); + logger.info(methodName, null, "**** Time to load jobs**** ", System.currentTimeMillis() - nowt); // ---------- Load reservation history + nowt = System.currentTimeMillis(); if ( doreservations ) loadReservations(); + logger.info(methodName, null, "**** Time to load reservations ****", System.currentTimeMillis() - nowt); // ---------- Load service isntance and AP history + nowt = System.currentTimeMillis(); if ( doservices ) loadServices(); + logger.info(methodName, null, "**** Time to load service instances ****", System.currentTimeMillis() - nowt); // ---------- Load service registry if ( doregistry ) { + nowt = System.currentTimeMillis(); ssd = new StateServicesDb(); ssd.init(logger); loadServiceRegistry(serviceRegistry, false); @@ -543,17 +570,21 @@ public class DbLoader } catch ( Exception e ) { e.printStackTrace(); } + logger.info(methodName, null, "**** Time to load Service registry ****", System.currentTimeMillis() - nowt); // ---------- Load service registry history + nowt = System.currentTimeMillis(); ssd = new StateServicesDb(); ssd.init(logger); loadServiceRegistry(serviceRegistryHistory, true); + logger.info(methodName, null, "**** Time to load Service history ****", System.currentTimeMillis() - nowt); } - OGlobalConfiguration.USE_WAL.setValue(true); - if ( docheckpoint ) loadCheckpoint(); - + nowt = System.currentTimeMillis(); + logger.info(methodName, null, "**** Total load time ****", System.currentTimeMillis() - now); + if ( docheckpoint ) loadCheckpoint(); + logger.info(methodName, null, "**** Time to reload checkpoint B ****", System.currentTimeMillis() - nowt); } catch ( Exception e ) { logger.error(methodName, null, e);
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbManager.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbManager.java?rev=1708149&r1=1708148&r2=1708149&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbManager.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbManager.java Mon Oct 12 16:10:55 2015 @@ -29,6 +29,7 @@ import org.apache.uima.ducc.database.DbC import org.apache.uima.ducc.database.DbConstants.DbVertex; import com.orientechnologies.orient.client.remote.OServerAdmin; +import com.orientechnologies.orient.core.intent.OIntent; import com.tinkerpop.blueprints.impls.orient.OrientGraph; import com.tinkerpop.blueprints.impls.orient.OrientGraphFactory; import com.tinkerpop.blueprints.impls.orient.OrientGraphNoTx; @@ -97,14 +98,39 @@ public class DbManager public void drop() throws Exception { - OrientGraphNoTx graphDb = factory.getNoTx(); // the graph instance + if ( dburl.startsWith("remote") ) { + OServerAdmin admin = null; + try { + String pw = dbPassword(); + admin = new OServerAdmin(dburl); + admin.connect("root", pw); // connect to the server + admin.dropDatabase("plocal"); + admin.close(); + } finally { + if ( admin != null ) admin.close(); + } + } else { + OrientGraphNoTx graphDb = factory.getNoTx(); // the graph instance + if ( graphDb == null ) { + throw new IllegalStateException("Cannot allocate graph instance for " + dburl); + } + graphDb.drop(); + } + } + + public synchronized DbHandle open() + throws Exception + { + OrientGraph graphDb = factory.getTx(); // the graph instance if ( graphDb == null ) { throw new IllegalStateException("Cannot allocate graph instance for " + dburl); } - graphDb.drop(); + + graphDb.setUseLightweightEdges(true); + return new DbHandle(this, graphDb); } - public synchronized DbHandle open() + public synchronized DbHandle openNoLog() throws Exception { OrientGraph graphDb = factory.getTx(); // the graph instance @@ -112,6 +138,8 @@ public class DbManager throw new IllegalStateException("Cannot allocate graph instance for " + dburl); } + graphDb.setUseLightweightEdges(true); + graphDb.getRawGraph().getTransaction().setUsingLog(false); return new DbHandle(this, graphDb); } @@ -123,7 +151,7 @@ public class DbManager if ( graphDb == null ) { throw new IllegalStateException("Cannot allocate graph instance for " + dburl); } - + graphDb.setUseLightweightEdges(true); return new DbHandle(this, graphDb); } @@ -150,6 +178,11 @@ public class DbManager factory.setupPool(1,20); } + public synchronized void declareIntent(OIntent intent) + { + factory.declareIntent(intent); + } + public synchronized void shutdown() { String methodName = "closeDatabase"; Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/HistoryManagerDb.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/HistoryManagerDb.java?rev=1708149&r1=1708148&r2=1708149&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/HistoryManagerDb.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/HistoryManagerDb.java Mon Oct 12 16:10:55 2015 @@ -18,29 +18,18 @@ */ package org.apache.uima.ducc.database; -import java.io.IOException; -import java.lang.reflect.ParameterizedType; -import java.lang.reflect.Type; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import org.apache.uima.ducc.common.DuccNode; -import org.apache.uima.ducc.common.IIdentity; -import org.apache.uima.ducc.common.Node; -import org.apache.uima.ducc.common.NodeIdentity; import org.apache.uima.ducc.common.SizeBytes; import org.apache.uima.ducc.common.main.DuccService; import org.apache.uima.ducc.common.utils.DuccLogger; import org.apache.uima.ducc.common.utils.id.DuccId; -import org.apache.uima.ducc.common.utils.id.IDuccId; import org.apache.uima.ducc.database.DbConstants.DbCategory; import org.apache.uima.ducc.database.DbConstants.DbEdge; import org.apache.uima.ducc.database.DbConstants.DbVertex; -import org.apache.uima.ducc.transport.agent.IUimaPipelineAEComponent; -import org.apache.uima.ducc.transport.cmdline.ICommandLine; import org.apache.uima.ducc.transport.event.common.ADuccWork; import org.apache.uima.ducc.transport.event.common.DuccProcess; import org.apache.uima.ducc.transport.event.common.DuccProcessMap; @@ -54,41 +43,21 @@ import org.apache.uima.ducc.transport.ev import org.apache.uima.ducc.transport.event.common.IDuccPerWorkItemStatistics; import org.apache.uima.ducc.transport.event.common.IDuccProcess; import org.apache.uima.ducc.transport.event.common.IDuccProcessMap; -import org.apache.uima.ducc.transport.event.common.IDuccProcessWorkItems; import org.apache.uima.ducc.transport.event.common.IDuccReservation; import org.apache.uima.ducc.transport.event.common.IDuccReservationMap; -import org.apache.uima.ducc.transport.event.common.IDuccSchedulingInfo; -import org.apache.uima.ducc.transport.event.common.IDuccStandardInfo; import org.apache.uima.ducc.transport.event.common.IDuccState.JobState; import org.apache.uima.ducc.transport.event.common.IDuccState.ReservationState; -import org.apache.uima.ducc.transport.event.common.IDuccUimaAggregateComponent; -import org.apache.uima.ducc.transport.event.common.IDuccUimaDeployableConfiguration; import org.apache.uima.ducc.transport.event.common.IDuccWork; import org.apache.uima.ducc.transport.event.common.IDuccWorkJob; import org.apache.uima.ducc.transport.event.common.IDuccWorkReservation; import org.apache.uima.ducc.transport.event.common.IDuccWorkService; -import org.apache.uima.ducc.transport.event.common.IRationale; -import org.apache.uima.ducc.transport.event.common.ITimeWindow; import org.apache.uima.ducc.transport.event.common.JdReservationBean; import org.apache.uima.ducc.transport.event.common.history.IHistoryPersistenceManager; import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.gson.InstanceCreator; -import com.google.gson.JsonDeserializationContext; -import com.google.gson.JsonDeserializer; -import com.google.gson.JsonElement; import com.google.gson.JsonObject; -import com.google.gson.JsonParseException; import com.google.gson.JsonParser; -import com.google.gson.JsonSerializationContext; -import com.google.gson.JsonSerializer; -import com.google.gson.TypeAdapter; -import com.google.gson.TypeAdapterFactory; -import com.google.gson.reflect.TypeToken; -import com.google.gson.stream.JsonReader; -import com.google.gson.stream.JsonToken; -import com.google.gson.stream.JsonWriter; +import com.orientechnologies.orient.core.intent.OIntentMassiveInsert; import com.orientechnologies.orient.core.record.impl.ODocument; import com.tinkerpop.blueprints.Direction; import com.tinkerpop.blueprints.Edge; @@ -119,6 +88,9 @@ public class HistoryManagerDb try { dbManager = new DbManager(dburl, logger); dbManager.init(); + // TODO TODO + dbManager.declareIntent(new OIntentMassiveInsert()); + logger.warn("<CTR>.HistoryManagerDb", null, "****MUST FIX DECLARE INTENT****"); } catch ( Exception e ) { logger.error("HisstoryManagerDb", null, "Cannot open the history database:", e); } @@ -132,66 +104,30 @@ public class HistoryManagerDb // ---------------------------------------------------------------------------------------------------- // Jobs section - Gson mkGsonForJob() - { - // We need to define Instance creators and such so we do it in a common place - GsonBuilder gb = new GsonBuilder(); - - GenericInterfaceAdapter customAdapter = new GenericInterfaceAdapter(); - gb.serializeSpecialFloatingPointValues().setPrettyPrinting(); - gb.enableComplexMapKeySerialization(); - - gb.registerTypeAdapter(Node.class, new NodeInstanceCreator()); - gb.registerTypeAdapter(NodeIdentity.class, new NodeIdentityCreator()); - - //gb.registerTypeAdapter(IIdentity.class, new IdentityInstanceCreator()); - gb.registerTypeAdapter(IIdentity.class, customAdapter); - - gb.registerTypeAdapter(IDuccId.class, customAdapter); - gb.registerTypeAdapter(ITimeWindow.class, customAdapter); - gb.registerTypeAdapter(IDuccProcessWorkItems.class, customAdapter); - gb.registerTypeAdapter(IDuccUimaAggregateComponent.class, customAdapter); - gb.registerTypeAdapter(IUimaPipelineAEComponent.class, customAdapter); - gb.registerTypeAdapter(IRationale.class, customAdapter); - gb.registerTypeAdapter(IDuccUimaDeployableConfiguration.class, customAdapter); - gb.registerTypeAdapter(IDuccStandardInfo.class, customAdapter); - gb.registerTypeAdapter(IDuccSchedulingInfo.class, customAdapter); - gb.registerTypeAdapter(IDuccPerWorkItemStatistics.class, customAdapter); - gb.registerTypeAdapter(IDuccReservationMap.class, customAdapter); - gb.registerTypeAdapter(JdReservationBean.class, customAdapter); - - //ConcurrentHashMap<DuccId, Long> x = new ConcurrentHashMap<DuccId, Long>(); - //gb.registerTypeAdapter(x.getClass(), new MapAdaptor()); - - //gb.registerTypeAdapterFactory(new DuccTypeFactory()); - //Object obj = new ArrayList<IJdReservation>(); - //gb.registerTypeAdapter(obj.getClass(), customAdapter); - Gson g = gb.create(); - return g; - } - /** * Common code to save a job in an open handle. Caller will commit or fail as needed. */ - private void saveJobNoCommit(DbHandle h, IDuccWorkJob j, DbVertex type, DbCategory dbcat) + void saveJobNoCommit(DbHandle h, IDuccWorkJob j, DbVertex type, DbCategory dbcat) throws Exception { String methodName = "saveJobNoCommit"; Long nowP = System.currentTimeMillis(); // Nuke the command lines DuccWorkPopDriver driver = j.getDriver(); - ICommandLine driverCl = null; + //ICommandLine driverCl = null; IDuccProcessMap jdProcessMap = null; + int size = 0; + if ( driver != null ) { - driverCl = driver.getCommandLine(); - driver.setCommandLine(null); + //driverCl = driver.getCommandLine(); + //driver.setCommandLine(null); jdProcessMap = driver.getProcessMap(); driver.setProcessMap(null); } - ICommandLine jobCl = j.getCommandLine(); - j.setCommandLine(null); + //ICommandLine jobCl = j.getCommandLine(); + //j.setCommandLine(null); IDuccPerWorkItemStatistics stats = j.getSchedulingInfo().getPerWorkItemStatistics(); @@ -205,28 +141,30 @@ public class HistoryManagerDb IDuccProcessMap processMap = j.getProcessMap(); j.setProcessMap(null); - Gson g = mkGsonForJob(); + Gson g = DbHandle.mkGsonForJob(); String dbJob = g.toJson(j); - + size += dbJob.length(); + // Must repair these things because OR continues to use the job after it has been // written to history. j.setProcessMap(processMap); - j.setCommandLine(jobCl); + //j.setCommandLine(jobCl); if ( driver != null ) { - driver.setCommandLine(driverCl); + //driver.setCommandLine(driverCl); driver.setProcessMap(jdProcessMap); } - Object savedJob = h.saveObject(type, j.getDuccId().getFriendly(), dbJob, dbcat); + OrientVertex savedJob = h.saveObject(type, j.getDuccId().getFriendly(), dbJob, dbcat); - List<Object> savedJPs = new ArrayList<Object>(); - List<Object> savedJDs = new ArrayList<Object>(); + List<OrientVertex> savedJPs = new ArrayList<OrientVertex>(); + List<OrientVertex> savedJDs = new ArrayList<OrientVertex>(); for (DuccId did : processMap.keySet()) { Long pid = did.getFriendly(); IDuccProcess p = processMap.get(did); String proc = g.toJson(p); + size += proc.length(); savedJPs.add(h.saveObject(DbVertex.Process, pid, proc, dbcat)); // logger.info(methodName, p.getDuccId(), "2 ----------> Time to save process", System.currentTimeMillis() - nowP); @@ -239,6 +177,7 @@ public class HistoryManagerDb IDuccProcess p = jdProcessMap.get(did); String proc = g.toJson(p); + size += proc.length(); savedJDs.add(h.saveObject(DbVertex.Process, pid, proc, dbcat)); // logger.info(methodName, p.getDuccId(), "2 ----------> Time to save process", System.currentTimeMillis() - nowP); @@ -249,7 +188,7 @@ public class HistoryManagerDb h.addEdges(savedJob, savedJPs, DbEdge.JpProcess); - logger.info(methodName, j.getDuccId(), "----------> Time to save job", System.currentTimeMillis() - nowP); + logger.info(methodName, j.getDuccId(), "----------> Time to save job", System.currentTimeMillis() - nowP, "json size", size, "nprocesses", processMap.size()); } @@ -266,11 +205,13 @@ public class HistoryManagerDb Long id = j.getDuccId().getFriendly(); DbHandle h = null; try { + if ( safe ) { h = dbManager.open(); } else { - h = dbManager.openNoTx(); + h = dbManager.openNoLog(); } + if ( safe && h.thingInDatabase(id, type, dbcat) ) { logger.warn(methodName, j.getDuccId(), "Not overwriting saved job."); h.close(); @@ -288,7 +229,9 @@ public class HistoryManagerDb logger.error(methodName, j.getDuccId(), "Cannot store job", e); throw e; } finally { + Long nowP = System.currentTimeMillis(); h.commit(); + logger.info(methodName, j.getDuccId(), "Time to commit", System.currentTimeMillis() - nowP); h.close(); } } @@ -322,7 +265,7 @@ public class HistoryManagerDb String json = d.toJSON(); JsonObject jo = mkJsonObject(json); - Gson g = mkGsonForJob(); + Gson g = DbHandle.mkGsonForJob(); j = g.fromJson(jo, DuccWorkJob.class); // System.out.println(g.toJson(jo)); @@ -424,6 +367,7 @@ public class HistoryManagerDb throws Exception { String methodName = "saveReservationNoCommit"; + long now = System.currentTimeMillis(); List<JdReservationBean> l = r.getJdReservationBeanList(); if ( l != null ) { @@ -436,7 +380,7 @@ public class HistoryManagerDb } - long now = System.currentTimeMillis(); + Long id = r.getDuccId().getFriendly(); logger.info(methodName, r.getDuccId(), "Saving."); @@ -446,7 +390,7 @@ public class HistoryManagerDb IDuccReservationMap resmap = r.getReservationMap(); r.setReservationMap(null); - Gson g = mkGsonForJob(); + Gson g = DbHandle.mkGsonForJob(); String dbres = g.toJson(r); // logger.info(methodName, null, "------------------- Reservation JSON: " + dbres); @@ -455,9 +399,9 @@ public class HistoryManagerDb // written to history. r.setReservationMap(resmap); - Object savedRes = h.saveObject(DbVertex.Reservation, id, dbres, dbcat); + OrientVertex savedRes = h.saveObject(DbVertex.Reservation, id, dbres, dbcat); - List<Object> savedHosts = new ArrayList<Object>(); + List<OrientVertex> savedHosts = new ArrayList<OrientVertex>(); for (DuccId did : resmap.keySet()) { Long pid = did.getFriendly(); @@ -532,7 +476,7 @@ public class HistoryManagerDb String json = d.toJSON(); JsonObject jo = mkJsonObject(json); - Gson g = mkGsonForJob(); + Gson g = DbHandle.mkGsonForJob(); // logger.info(methodName, null, g.toJson(jo)); r = g.fromJson(jo, DuccWorkReservation.class); @@ -705,6 +649,7 @@ public class HistoryManagerDb throws Exception { String methodName = "checkpoint"; + long now = System.currentTimeMillis(); boolean ret = true; DbHandle h = null; @@ -730,6 +675,10 @@ public class HistoryManagerDb saveJobNoCommit(h, (IDuccWorkJob) w, DbVertex.ServiceInstance, DbCategory.Checkpoint); break; case Reservation: + if ( w.getDuccId().getFriendly() == 282282 ) { + int x = 0; + x++; + } saveReservationNoCommit(h, (IDuccWorkReservation) w, DbCategory.Checkpoint); break; default: @@ -737,7 +686,7 @@ public class HistoryManagerDb } } - Gson g = mkGsonForJob(); + Gson g = DbHandle.mkGsonForJob(); ProcessToJobList l = new ProcessToJobList(processToJob); String json = g.toJson(l, l.getClass()); // logger.info(methodName, null, "ProcessToJob:", json); @@ -751,7 +700,9 @@ public class HistoryManagerDb if ( h != null ) h.close(); if ( ret ) logger.info(methodName, null, "Saved Orchestrator Checkpoint"); } - return ret; + + logger.info(methodName, null, "Total time to save checkpoint:", System.currentTimeMillis() - now); + return ret; } /** @@ -805,7 +756,7 @@ public class HistoryManagerDb String json = d.toJSON(); logger.info(methodName, null, json); - Gson g = mkGsonForJob(); + Gson g = DbHandle.mkGsonForJob(); ProcessToJobList l = g.fromJson(json, ProcessToJobList.class); l.fill(processToJob); @@ -844,168 +795,4 @@ public class HistoryManagerDb // End of common // ---------------------------------------------------------------------------------------------------- - - // ---------------------------------------------------------------------------------------------------- - // Instance creators and adaptors for GSON - // ---------------------------------------------------------------------------------------------------- - - // We need these for the DuccNode and NodeIdentity because they don't have no-arg - // Constructors. - // - // @TODO after merge, consult with Jerry about adding in those constructors - private class NodeInstanceCreator implements InstanceCreator<Node> { - public Node createInstance(Type type) { - // System.out.println("DuccNode"); - return new DuccNode(null, null, false); - } - } - - private class NodeIdentityCreator implements InstanceCreator<NodeIdentity> { - public NodeIdentity createInstance(Type type) { - // System.out.println("DuccNodeIdentity"); - try { return new NodeIdentity(null, null); } catch ( Exception e ) {} - return null; - } - } - - /** - * JSON helper for our complex objects. Gson doesn't save type information in the json so - * it doesn't know how to construct things declared as interfaces. - * - * This class is a Gson adapter that saves the actual object type in the json on serialization, - * and uses that information on deserialization to construct the right thing. - */ - private class GenericInterfaceAdapter - implements - JsonSerializer<Object>, - JsonDeserializer<Object> - { - - private static final String DUCC_META_CLASS = "DUCC_META_CLASS"; - - @Override - public Object deserialize(JsonElement jsonElement, - Type type, - JsonDeserializationContext jsonDeserializationContext) - throws JsonParseException - { - // Reconstitute the "right" class based on the actual class it came from as - // found in metadata - JsonObject obj = jsonElement.getAsJsonObject(); - JsonElement clElem= obj.get(DUCC_META_CLASS); - - if ( clElem== null ) { - throw new IllegalStateException("Cannot determine concrete class for " + type + ". Must register explicit type adapter for it."); - } - String clName = clElem.getAsString(); - - //System.out.println("----- elem: " + clName + " clElem: " + obj); - try { - Class<?> clz = Class.forName(clName); - return jsonDeserializationContext.deserialize(jsonElement, clz); - } catch (ClassNotFoundException e) { - throw new JsonParseException(e); - } - } - - @Override - public JsonElement serialize(Object object, - Type type, - JsonSerializationContext jsonSerializationContext) - { - // Add the mete element indicating what kind of concrete class is this came from - //String n = object.getClass().getCanonicalName(); - //System.out.println("**** Serialize object A " + n + " of type " + type); - //if ( n.contains("Concurrent") ) { - // int stop = 1; - // stop++; - //} - - JsonElement ele = jsonSerializationContext.serialize(object, object.getClass()); - //System.out.println("**** Serialize object B " + object.getClass().getCanonicalName() + " of type " + type + " : ele " + ele); - ele.getAsJsonObject().addProperty(DUCC_META_CLASS, object.getClass().getCanonicalName()); - return ele; - } - } - - @SuppressWarnings("unused") - private class DuccTypeFactory - implements TypeAdapterFactory - { - - public <T> TypeAdapter<T> create(Gson gson, TypeToken<T> typeToken) - { - //System.out.println("TYPETOKEN: " + typeToken + " raw type: " + typeToken.getRawType().getName()); - Class<?> cl = typeToken.getRawType(); - //System.out.println(" Canonical name: " + cl.getCanonicalName()); - Type type = typeToken.getType(); - if ( typeToken.getRawType() != ConcurrentHashMap.class ) { - //System.out.println("Skipping type " + typeToken); - return null; - } - - if ( type instanceof ParameterizedType ) { - - ParameterizedType pt = (ParameterizedType) type; - Type[] types = pt.getActualTypeArguments(); - //for ( Type tt : types ) { - // System.out.println(" TYPE ARGUMENTS: " + tt); - //} - Type tt = types[0]; - Class<?> cll = (Class<?>) tt; - - } - return null; - } - - } - - @SuppressWarnings("unused") - private class MapAdaptor - extends TypeAdapter<ConcurrentHashMap<DuccId, Long>> - { - - public void write(JsonWriter out, ConcurrentHashMap<DuccId, Long> map) throws IOException { - System.out.println("***************** Writing"); - if (map == null) { - out.nullValue(); - return; - } - - out.beginArray(); - for (DuccId k : map.keySet() ) { - out.beginObject(); - out.value(k.getFriendly()); - out.value(k.getUnique()); - out.value(map.get(k)); - out.endObject(); - } - out.endArray(); - } - - public ConcurrentHashMap<DuccId, Long> read(JsonReader in) throws IOException { - System.out.println("***************** reading"); - if (in.peek() == JsonToken.NULL) { - in.nextNull(); - return null; - } - - ConcurrentHashMap<DuccId, Long> ret = new ConcurrentHashMap<DuccId, Long>(); - in.beginArray(); - while (in.hasNext()) { - in.beginObject(); - Long friendly = in.nextLong(); - String unique = in.nextString(); - - Long val = in.nextLong(); - in.endObject(); - DuccId id = new DuccId(friendly); - id.setUUID(UUID.fromString(unique)); - ret.put(id, val); - } - in.endArray(); - return ret; - } - } - } Added: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmStatePersistence.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmStatePersistence.java?rev=1708149&view=auto ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmStatePersistence.java (added) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmStatePersistence.java Mon Oct 12 16:10:55 2015 @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. +*/ + +package org.apache.uima.ducc.database; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import org.apache.uima.ducc.common.persistence.rm.IRmPersistence; +import org.apache.uima.ducc.common.utils.DuccLogger; +import org.apache.uima.ducc.database.DbConstants.DbCategory; +import org.apache.uima.ducc.database.DbConstants.DbVertex; + +import com.google.gson.Gson; +import com.tinkerpop.blueprints.impls.orient.OrientVertex; + +/** + * Manage saving and fetching of transient RM state. The primary consumer is + * intended to be the WS. + */ +public class RmStatePersistence + implements IRmPersistence +{ + + DbManager dbManager = null; + DuccLogger logger = null; + public RmStatePersistence() + { + } + + private boolean init(String dburl) + throws Exception + { + boolean ret = false; + try { + dbManager = new DbManager(dburl, logger); + dbManager.init(); + ret = true; + } catch ( Exception e ) { + throw e; + } + return ret; + } + + public void init(DuccLogger logger) + throws Exception + { + this.logger = logger; + String stateUrl = System.getProperty("ducc.state.database.url"); + init(stateUrl); + } + + public void clear() + throws Exception + { + String methodName = "clear"; + DbHandle h = null; + try { + h = dbManager.open(); + h.execute("DELETE VERTEX V where " + DbConstants.DUCC_DBCAT + "='" + DbCategory.RmState.pname() + "'"); + } catch ( Exception e ) { + logger.error(methodName, null, "Cannot clear the database.", e); + } finally { + if ( h != null ) h.close(); + } + } + + public String toGson(Object o) + { + // We need to define Instance creators and such so we do it in a common place + Gson g = DbHandle.mkGsonForJob(); + return g.toJson(o); + } + + public Object createMachine(String m, Properties props) + throws Exception + { + String methodName = "createMachine"; + DbHandle h = dbManager.open(); + Object ret = null; + try { + OrientVertex v = h.createProperties(DbConstants.DUCC_DBNODE, m, DbVertex.RmNode, DbCategory.RmState, props); + ret = v.getId(); + h.commit(); + } catch ( Exception e ) { + logger.error(methodName, null, "Update", m, "ROLLBACK: ", e); + if ( h != null ) h.rollback(); + } finally { + if ( h != null ) h.close(); + } + return ret; + } + + public void setProperties(Object dbid, String dbk, Object... props) + throws Exception + { + String methodName = "setProperties"; + + long now = System.currentTimeMillis(); + if (( props.length % 2) != 0 ) { + throw new IllegalStateException("Set properties: number of properties must be even, instead was " + props.length); + } + + DbHandle h = dbManager.open(); + + try { + h.updateProperties(dbid, props); + h.commit(); + } catch ( Exception e ) { + logger.error(methodName, null, "Update", dbk, "ROLLBACK: ", e); + if ( h != null ) h.rollback(); + } finally { + if ( h != null ) h.close(); + logger.info(methodName, null, "Total time to update properties on", dbid.toString(), System.currentTimeMillis() - now); + + } + + } + + public void setProperty(Object dbid, String dbk, RmPropName k, Object v) + throws Exception + { + String methodName = "setProperty"; + long now = System.currentTimeMillis(); + + DbHandle h = dbManager.open(); + + try { + h.updateProperty(dbid, k.pname(), v); + h.commit(); + } catch ( Exception e ) { + logger.error(methodName, null, "Update", dbk, "ROLLBACK: ", e); + if ( h != null ) h.rollback(); + } finally { + if ( h != null ) h.close(); + logger.info(methodName, null, "Total time to update property on", dbid.toString(), System.currentTimeMillis() - now); + } + + } + + public Properties getMachine(String m) + throws Exception + { + return null; + } + + public Map<String, Properties> getAllMachines() + throws Exception + { + return new HashMap<String, Properties>(); + } + + public static void main(String[] args) + { + } + +} + +/** + String name; + String nodepoolId; + long memory; + int order; + boolean blacklisted; // UIMA-4142 + boolean online; // UIMA-4234 + boolean responsive; // UIMA-4234 + + + + + Properties file for a node + name = string + ip = string + state = <state> + states: vary status: online | offline + reporting : present | absent + nodepool = string + quantum = string + class = string + scheduling policy = string + scheduled work = list of duccids of work on the node + */ Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/StateServicesDb.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/StateServicesDb.java?rev=1708149&r1=1708148&r2=1708149&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/StateServicesDb.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/StateServicesDb.java Mon Oct 12 16:10:55 2015 @@ -32,6 +32,8 @@ import org.apache.uima.ducc.common.utils import org.apache.uima.ducc.database.DbConstants.DbCategory; import org.apache.uima.ducc.database.DbConstants.DbVertex; +import com.tinkerpop.blueprints.impls.orient.OrientVertex; + public class StateServicesDb implements IStateServices { @@ -122,6 +124,8 @@ public class StateServicesDb throws Exception { String methodName = "getStateServicesDirectory"; + long now = System.currentTimeMillis(); + StateServicesDirectory ret = new StateServicesDirectory(); if ( dbManager== null ) { @@ -129,10 +133,10 @@ public class StateServicesDb return ret; // avoid NPE in caller } - DbHandle h = dbManager.open(); + DbHandle h = dbManager.openNoTx(); try { - Map<Long, Properties> svcset = h.getPropertiesForType(DbVertex.ServiceReg , DbCategory.SmReg); - Map<Long, Properties> metaset = h.getPropertiesForType(DbVertex.ServiceMeta, DbCategory.SmReg); + Map<Long, Properties> svcset = h.getPropertiesForTypeSel(DbVertex.ServiceReg , DbCategory.SmReg); + Map<Long, Properties> metaset = h.getPropertiesForTypeSel(DbVertex.ServiceMeta, DbCategory.SmReg); for ( Long k : svcset.keySet() ) { logger.trace(methodName, null, "Handling key", k); @@ -147,7 +151,8 @@ public class StateServicesDb } finally { if ( h != null ) h.close(); } - + + logger.info(methodName, null, "Time to read service registy", System.currentTimeMillis() - now); return ret; } @@ -176,15 +181,20 @@ public class StateServicesDb h = dbManager.openNoTx(); } + if ( meta_props.containsKey("meta_dbid")) return false; // if it's assigned, it came from the db so we know it's already there + Long id = serviceId.getFriendly(); - if ( safe ) { - if ( h.thingInDatabase(id, DbVertex.ServiceReg, category) ) { - return false; - } - } + OrientVertex ov_svc = h.createProperties(DbConstants.DUCCID, id, DbVertex.ServiceReg , category, svc_props); + OrientVertex ov_meta = h.createProperties(DbConstants.DUCCID, id, DbVertex.ServiceMeta, category, meta_props); + + Object dbid = ov_svc.getId(); + meta_props.put("svc_dbid", dbid); + ov_meta.setProperty("svc_dbid", dbid); + + dbid = ov_meta.getId(); + meta_props.put("meta_dbid", dbid); + ov_meta.setProperty("meta_dbid", dbid); - h.createPropertiesObject(svc_props, DbVertex.ServiceReg, id, category); - h.createPropertiesObject(meta_props, DbVertex.ServiceMeta, id, category); h.commit(); return true; } catch ( Exception e ) { @@ -224,14 +234,21 @@ public class StateServicesDb { // All we need to do is re-sync the final properties, and be sure to set DUCC_HISTORY to false String methodName = "moveToHistory"; - Long id = serviceId.getFriendly(); DbHandle h = null; try { h = dbManager.open(); // get new connection from the pool - h.syncProperties(job_props, DbVertex.ServiceReg, id, DbCategory.History); - h.syncProperties(meta_props, DbVertex.ServiceMeta, id, DbCategory.History); + Object svc_dbid = meta_props.get("svc_dbid"); + Object meta_dbid = meta_props.get("meta_dbid"); + OrientVertex obj_reg = h.updateProperties(svc_dbid, job_props); + OrientVertex obj_meta = h.updateProperties(meta_dbid, meta_props); + + h.changeCategory(obj_reg, DbCategory.History); + h.changeCategory(obj_meta, DbCategory.History); h.commit(); + + // h.syncProperties(job_props, DbVertex.ServiceReg, id, DbCategory.History); + // h.syncProperties(meta_props, DbVertex.ServiceMeta, id, DbCategory.History); } catch ( Exception e ) { logger.error(methodName, serviceId, "ROLLBACK: ", e); if ( h != null ) h.rollback(); @@ -249,20 +266,24 @@ public class StateServicesDb * the service is being modified, it could also be the registration. * @param type The type enum, ususally Service or ServiceMeta. */ - private boolean updateProperties(DuccId serviceId, Properties props, DbVertex type) + private boolean updateProperties(Object dbid, DuccId serviceId, Properties props, DbVertex type) { String methodName = "updatePropeties"; DbHandle h = null; try { h = dbManager.open(); - h.syncProperties(props, type, serviceId.getFriendly(), DbCategory.SmReg); + h.updateProperties(dbid, props); + // h.synchronizeProperties(DbConstants.DUCCID, serviceId.getFriendly(), type, DbCategory.SmReg, props); + // h.synchronizeProperties(dbid, props); + h.commit(); + // h.syncProperties(props, type, serviceId.getFriendly(), DbCategory.SmReg); return true; } catch ( Exception e ) { logger.error(methodName, serviceId, "ROLLBACK:", e); if ( h != null ) h.rollback(); return false; - } finally { + } finally { if ( h != null ) h.close(); } } @@ -270,17 +291,17 @@ public class StateServicesDb /** * Update the service registration. */ - public boolean updateJobProperties(DuccId serviceId, Properties props) + public boolean updateJobProperties(Object dbid, DuccId serviceId, Properties props) { - return updateProperties(serviceId, props, DbVertex.ServiceReg); + return updateProperties(dbid, serviceId, props, DbVertex.ServiceReg); } /** * Update the service meta data. */ - public boolean updateMetaProperties(DuccId serviceId, Properties props) + public boolean updateMetaProperties(Object dbid, DuccId serviceId, Properties props) { - return updateProperties(serviceId, props, DbVertex.ServiceMeta); + return updateProperties(dbid, serviceId, props, DbVertex.ServiceMeta); } /** Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/misc/SmLoader.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/misc/SmLoader.java?rev=1708149&r1=1708148&r2=1708149&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/misc/SmLoader.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/misc/SmLoader.java Mon Oct 12 16:10:55 2015 @@ -6,6 +6,7 @@ import java.io.IOException; import java.util.Properties; import org.apache.uima.ducc.common.utils.DuccLogger; +import org.apache.uima.ducc.database.DbConstants; import org.apache.uima.ducc.database.DbConstants.DbCategory; import org.apache.uima.ducc.database.DbConstants.DbEdge; import org.apache.uima.ducc.database.DbConstants.DbVertex; @@ -13,8 +14,11 @@ import org.apache.uima.ducc.database.DbH import org.apache.uima.ducc.database.DbLoader; import org.apache.uima.ducc.database.DbManager; +import com.tinkerpop.blueprints.impls.orient.OrientVertex; + /** * Toy orientdb loader to load a service registry into OrientDb + * WARNING: No longer correct. */ public class SmLoader @@ -71,14 +75,14 @@ public class SmLoader String cps = (String) svc_props.remove("classpath"); //String ping_cps = (String) svc_props.remove("service_ping_classpath"); h = dbManager.open(); - Object svc = h.createPropertiesObject(svc_props, DbVertex.ServiceInstance, id, dbcat); - Object meta = h.createPropertiesObject(meta_props, DbVertex.ServiceMeta, id, dbcat); + OrientVertex svc = h.createProperties(DbConstants.DUCCID, id, DbVertex.ServiceReg, dbcat, svc_props); + OrientVertex meta = h.createProperties(DbConstants.DUCCID, id, DbVertex.ServiceMeta, dbcat, meta_props); h.addEdge(svc, meta, DbEdge.ServiceMeta); if ( cps != null ) { Properties cp_props = new Properties(); cp_props.put("classpath", cps); - Object cp = h.createPropertiesObject(cp_props, DbVertex.Classpath, id, dbcat); + OrientVertex cp = h.createProperties(DbConstants.DUCCID, id, DbVertex.Classpath, dbcat, cp_props); h.addEdge(svc, cp, DbEdge.Classpath); } Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/NodeStability.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/NodeStability.java?rev=1708149&r1=1708148&r2=1708149&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/NodeStability.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/NodeStability.java Mon Oct 12 16:10:55 2015 @@ -56,6 +56,14 @@ public class NodeStability { String methodName = "missedNode"; logger.warn(methodName, null, "*** Missed heartbeat ***", n.getNodeIdentity().getName(), "count[", c, "]"); + scheduler.nodeHb(n, c); + } + + public void nodeRecovers(Node n) + { + String methodName = "nodeRecovers"; + logger.info(methodName, null, "*** Node recovers ***", n.getNodeIdentity().getName()); + scheduler.nodeHb(n, 0); } public void nodeArrives(Node n) Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/event/ResourceManagerEventListener.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/event/ResourceManagerEventListener.java?rev=1708149&r1=1708148&r2=1708149&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/event/ResourceManagerEventListener.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/event/ResourceManagerEventListener.java Mon Oct 12 16:10:55 2015 @@ -20,6 +20,7 @@ package org.apache.uima.ducc.rm.event; import org.apache.camel.Body; import org.apache.uima.ducc.common.ANodeStability; +import org.apache.uima.ducc.common.utils.DuccLogger; import org.apache.uima.ducc.rm.ResourceManager; import org.apache.uima.ducc.rm.scheduler.SchedConstants; import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher; @@ -33,7 +34,7 @@ public class ResourceManagerEventListene implements DuccEventDelegateListener, SchedConstants { - //private static DuccLogger logger = DuccLogger.getLogger(ResourceManagerEventListener.class, COMPONENT_NAME); + private static DuccLogger logger = DuccLogger.getLogger(ResourceManagerEventListener.class, COMPONENT_NAME); private String targetEndpoint; private ResourceManager rm; @@ -99,8 +100,8 @@ public class ResourceManagerEventListene */ public void onOrchestratorStateUpdateEvent(@Body OrchestratorStateDuccEvent duccEvent) throws Exception { - //String methodName = "onOrchestratorStateUpdateEvent"; - //logger.info(methodName, null, "Event arrives"); + String methodName = "onOrchestratorStateUpdateEvent"; + logger.info(methodName, null, "Event arrives"); rm.onOrchestratorStateUpdate(duccEvent.getWorkMap()); } Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java?rev=1708149&r1=1708148&r2=1708149&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java Mon Oct 12 16:10:55 2015 @@ -46,6 +46,7 @@ public interface ISchedulerMain void nodeArrives(Node n); void nodeDeath(Map<Node, Node> n); + void nodeHb(Node n, int count); void signalCompletion(DuccId id); void signalInitialized(IRmJob id); void signalCompletion(IRmJob job, Share share); Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Machine.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Machine.java?rev=1708149&r1=1708148&r2=1708149&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Machine.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Machine.java Mon Oct 12 16:10:55 2015 @@ -25,6 +25,9 @@ import org.apache.uima.ducc.common.Node; import org.apache.uima.ducc.common.NodeIdentity; import org.apache.uima.ducc.common.admin.event.RmQueriedMachine; import org.apache.uima.ducc.common.admin.event.RmQueriedShare; +import org.apache.uima.ducc.common.persistence.rm.IRmPersistence; +import org.apache.uima.ducc.common.persistence.rm.IRmPersistence.RmPropName; +import org.apache.uima.ducc.common.persistence.rm.RmPersistenceFactory; import org.apache.uima.ducc.common.utils.DuccLogger; import org.apache.uima.ducc.common.utils.id.DuccId; @@ -36,10 +39,10 @@ public class Machine private static DuccLogger logger = DuccLogger.getLogger(Machine.class, COMPONENT_NAME); private String id; - private int hbcounter = 0; // heartbeat counter private long memory; // in Kb private int share_order = 1; + private int heartbeats = 0; private NodePool nodepool; @@ -72,12 +75,15 @@ public class Machine Node node; private HashMap<Share, Share> activeShares = new HashMap<Share, Share>(); + private IRmPersistence persistence = null; + Object dbid = null; public Machine(Node node) { this.node = node; this.memory = node.getNodeMetrics().getNodeMemory().getMemTotal(); this.id = node.getNodeIdentity().getName(); + this.persistence = RmPersistenceFactory.getInstance(this.getClass().getName(), "RM"); } // public Machine(String id, long memory) @@ -162,19 +168,44 @@ public class Machine return blacklistedWork.size() > 0; } - public synchronized void heartbeat_down() + public synchronized void heartbeatArrives() { - hbcounter = 0; + String methodName = "heartbeatArrives"; + long now = System.currentTimeMillis(); + if ( heartbeats == 0 ) return; // no need to rereset it + try { + logger.info(methodName, null, "Reset heartbeat to 0"); + persistence.setProperty(dbid, id, RmPropName.Heartbeats, 0); + logger.info(methodName, null, "Time to reset heartbeat", System.currentTimeMillis() - now); + } catch (Exception e) { + logger.warn(methodName, null, "Cannot update heartbeat count in database:", e); + } } - public synchronized void heartbeat_up() + public synchronized void heartbeatMissed(int c) { - hbcounter++; + String methodName = "heartbeatMissed"; + long now = System.currentTimeMillis(); + + if ( c < 2 ) return; // we allow a couple because timing and races can create false negatives + try { + heartbeats = c; + logger.info(methodName, null, "Missed heartbeat count", c); + persistence.setProperty(dbid, id, RmPropName.Heartbeats, c); + logger.info(methodName, null, "Time to record misssed heartbeat", System.currentTimeMillis() - now); + } catch (Exception e) { + logger.warn(methodName, null, "Cannot update heartbeat count in database:", e); + } } - public synchronized int get_heartbeat() + Object getDbId() { - return hbcounter; + return this.dbid; + } + + void setDbId(Object dbid) + { + this.dbid = dbid; } public NodeIdentity getNodeIdentity() @@ -227,12 +258,6 @@ public class Machine return activeShares.size(); } - public void assignShare(Share s) - { - activeShares.put(s, s); - shares_left -= s.getShareOrder(); - } - HashMap<Share, Share> getActiveShares() { return activeShares; @@ -296,11 +321,35 @@ public class Machine this.virtual_share_order = share_order - blacklisted_shares; } + public void assignShare(Share s) + { + String methodName = "assignShare"; + long now = System.currentTimeMillis(); + activeShares.put(s, s); + shares_left -= s.getShareOrder(); + try { + persistence.setProperties(dbid, id, RmPropName.Assignments.pname(), activeShares.size(), RmPropName.SharesLeft.pname(), shares_left); + logger.info(methodName, null, "Time to assign share in db", System.currentTimeMillis() - now); + } catch (Exception e) { + logger.warn(methodName, null, "Cannot save state; shares_left", shares_left); + } + + } + public void removeShare(Share s) { + String methodName = "removeShare"; + long now = System.currentTimeMillis(); + activeShares.remove(s); nodepool.removeShare(s); shares_left += s.getShareOrder(); + try { + persistence.setProperties(dbid, id, RmPropName.Assignments.pname(), activeShares.size(), RmPropName.SharesLeft.pname(), shares_left); + logger.info(methodName, null, "Time to remove share in db", System.currentTimeMillis() - now); + } catch (Exception e) { + logger.warn(methodName, null, "Cannot save state; shares_left", shares_left); + } } /** Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodePool.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodePool.java?rev=1708149&r1=1708148&r2=1708149&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodePool.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodePool.java Mon Oct 12 16:10:55 2015 @@ -27,9 +27,13 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Properties; import org.apache.uima.ducc.common.Node; import org.apache.uima.ducc.common.NodeIdentity; +import org.apache.uima.ducc.common.persistence.rm.IRmPersistence; +import org.apache.uima.ducc.common.persistence.rm.IRmPersistence.RmPropName; +import org.apache.uima.ducc.common.persistence.rm.RmPersistenceFactory; import org.apache.uima.ducc.common.utils.DuccLogger; import org.apache.uima.ducc.transport.event.common.IDuccTypes.DuccType; @@ -97,6 +101,8 @@ class NodePool HashMap<Integer, Map<Node, Machine>> virtualMachinesByOrder = new HashMap<Integer, Map<Node, Machine>>(); // UIMA-4142 GlobalOrder maxorder = null; + IRmPersistence persistence = null; + // NodePool(NodePool parent, String id, EvictionPolicy ep, int order) // { // this.parent = parent; @@ -126,6 +132,8 @@ class NodePool } else { maxorder = parent.getGlobalOrder(); } + + persistence = RmPersistenceFactory.getInstance(this.getClass().getName(), "RM"); } void addResourceClass(ResourceClass cl) @@ -986,6 +994,88 @@ class NodePool } } + + void signalDb(Machine m, RmPropName key, Object value) + { + String methodName = "signalDb"; + try { + persistence.setProperty(m.getDbId(), m.getNode().getNodeIdentity().getName(), key, value); + } catch (Exception e) { + logger.warn(methodName, null, "Cannot update DB property", key, "for machine", m); + } + } + + Properties initDbProperties(Machine m) + { + Node n = m.getNode(); + NodeIdentity nid = n.getNodeIdentity(); + + Properties props = new Properties(); + props.setProperty(RmPropName.Name.pname(), nid.getName()); + props.setProperty(RmPropName.Ip.pname(), nid.getIp()); + props.setProperty(RmPropName.Nodepool.pname(), id); + props.put(RmPropName.Quantum.pname(), share_quantum); + + props.put(RmPropName.Memory.pname() , m.getMemory()); + props.put(RmPropName.ShareOrder.pname() , m.getShareOrder()); + props.put(RmPropName.Blacklisted.pname() , m.isBlacklisted()); + + // init these here, but must be maintained by machine + props.put(RmPropName.Heartbeats.pname() , 0); + props.put(RmPropName.SharesLeft.pname() , m.countFreeShares()); // qshares remaining + props.put(RmPropName.Assignments.pname() , m.countProcesses()); // processes + + return props; + } + + // /** + // * On init, seed the database with everything we know about nodes. + // * TODO: not used - do we care? + // */ + // void initializeDbx() + // { + // String methodName = "initializeDb"; + // for ( NodePool np : children.values() ) { + // np.initializeDbx(); + // } + + // for (Node n : allMachines.keySet()) { + // Machine m = allMachines.get(n); + // Properties props = initDbProperties(m); + // props.put(RmPropName.Responsive.pname(), true); + // props.put(RmPropName.Online.pname(), true); + // try { + // persistence.createMachine(m.getId(), props); + // } catch (Exception e) { + // logger.warn(methodName, null, "Cannot store (online) node", m.getId(), "in db:", e); + // } + // } + // for (Node n : unresponsiveMachines.keySet()) { + // Machine m = unresponsiveMachines.get(n); + // Properties props = initDbProperties(m); + // props.setProperty(RmPropName.Responsive.pname(), "false"); + // props.setProperty(RmPropName.Online.pname(), "true"); + // try { + // persistence.createMachine(m.getId(), props); + // } catch (Exception e) { + // logger.warn(methodName, null, "Cannot store (unresponsive) node", m.getId(), "in db:", e); + // } + // } + // for (Node n : offlineMachines.keySet()) { + // Machine m = offlineMachines.get(n); + // Properties props = initDbProperties(m); + // props.setProperty(RmPropName.Responsive.pname(), "unknown"); + // props.setProperty(RmPropName.Online.pname(), "false"); + // try { + // persistence.createMachine(m.getId(), props); + // } catch (Exception e) { + // logger.warn(methodName, null, "Cannot store (offline) node", m.getId(), "in db:", e); + // } + // } + + // } + + /** * Handle a new node update. */ @@ -1010,6 +1100,7 @@ class NodePool if ( offlineMachines.containsKey(node) ) { // if it's offline it can't be restored like this. Machine m = offlineMachines.get(node); + signalDb(m, RmPropName.Responsive, true); logger.trace(methodName, null, "Node ", m.getId(), " is offline, not activating."); return m; } @@ -1034,14 +1125,15 @@ class NodePool mlist.put(m.key(), m); total_shares += order; // UIMA-3939 - + signalDb(m, RmPropName.Responsive, true); logger.info(methodName, null, "Nodepool:", id, "Host reactivated ", m.getId(), String.format("shares %2d total %4d:", order, total_shares), m.toString()); return m; } Machine machine = new Machine(node); // brand new machine, make it active + Node key = machine.key(); machine.setShareOrder(order); - allMachines.put(machine.key(), machine); // global list + allMachines.put(key, machine); // global list machinesByName.put(machine.getId(), machine); machinesByIp.put(machine.getIp(), machine); incrementOnlineByOrder(order); @@ -1055,11 +1147,21 @@ class NodePool mlist = new HashMap<Node, Machine>(); machinesByOrder.put(order, mlist); } - mlist.put(machine.key(), machine); + mlist.put(key, machine); logger.info(methodName, null, "Nodepool:", id, "Host added:", id, ": ", machine.getId(), "Nodefile:", subpoolNames.get(machine.getId()), // UIMA-4142, add file nodefile String.format("shares %2d total %4d:", order, total_shares), machine.toString()); updated++; + + Properties props = initDbProperties(allMachines.get(key)); + props.put(RmPropName.Responsive.pname(), true); + props.put(RmPropName.Online.pname(), true); + try { + Object dbid = persistence.createMachine(machine.getId(), props); + machine.setDbId(dbid); + } catch (Exception e) { + logger.warn(methodName, null, "Cannot write machine to DB:", machine.getId(), e); + } return machine; } @@ -1131,6 +1233,7 @@ class NodePool void nodeLeaves(Machine m) { disable(m, unresponsiveMachines); + signalDb(m, RmPropName.Responsive, false); } // UIMA-4142 @@ -1171,6 +1274,7 @@ class NodePool Node key = mm.key(); iter.remove(); offlineMachines.put(key, mm); + signalDb(m, RmPropName.Online, false); return "VaryOff: Nodepool " + id + " - Unresponsive machine, marked offline: " + node; } } @@ -1179,6 +1283,7 @@ class NodePool } disable(m, offlineMachines); + signalDb(m, RmPropName.Online, false); return "VaryOff: " + node + " - OK."; } @@ -1197,6 +1302,7 @@ class NodePool Machine mm = iter.next(); if ( mm.getId().equals(node) ) { iter.remove(); + signalDb(mm, RmPropName.Online, true); return "VaryOn: Nodepool " + id + " - Machine marked online: " + node; } } @@ -1205,6 +1311,7 @@ class NodePool while ( iter.hasNext() ) { Machine mm = iter.next(); if ( mm.getId().equals(node) ) { + signalDb(mm, RmPropName.Online, true); return "VaryOn: Nodepool " + id + " - Machine is online but not responsive: " + node; } } Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java?rev=1708149&r1=1708148&r2=1708149&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java Mon Oct 12 16:10:55 2015 @@ -38,6 +38,8 @@ import org.apache.uima.ducc.common.admin import org.apache.uima.ducc.common.admin.event.RmQueriedMachine; import org.apache.uima.ducc.common.admin.event.RmQueriedNodepool; import org.apache.uima.ducc.common.component.AbstractDuccComponent; +import org.apache.uima.ducc.common.persistence.rm.IRmPersistence; +import org.apache.uima.ducc.common.persistence.rm.RmPersistenceFactory; import org.apache.uima.ducc.common.utils.DuccLogger; import org.apache.uima.ducc.common.utils.DuccProperties; import org.apache.uima.ducc.common.utils.DuccPropertiesResolver; @@ -92,6 +94,7 @@ public class Scheduler //HashMap<Node, Node> incomingNodes = new HashMap<Node, Node>(); // node updates Map<Node, Node> deadNodes = new HashMap<Node, Node>(); // missed too many heartbeats + Map<Node, Integer> illNodes = new HashMap<Node, Integer>(); // starting to miss, keep track of how many for the db // HashMap<Node, Node> allNodes = new HashMap<Node, Node>(); // the guys we know Map<String, NodePool> nodepoolsByNode = new HashMap<String, NodePool>(); // all nodes, and their associated pool Map<String, String> shortToLongNode = new HashMap<String, String>(); // @@ -234,6 +237,8 @@ public class Scheduler logger.info(methodName, null, " class definition file : ", class_definitions); logger.info(methodName, null, " default domain : ", defaultDomain); // UIMA-4142 logger.info(methodName, null, " eviction policy : ", evictionPolicy); + logger.info(methodName, null, " database enabled : ", !System.getProperty("ducc.database.host").equals("--disabled--")); + logger.info(methodName, null, " database implementation : ", System.getProperty("ducc.rm.persistence.impl")); logger.info(methodName, null, " use prediction : ", SystemPropertyResolver.getBooleanProperty("ducc.rm.prediction", true)); logger.info(methodName, null, " prediction fudge factor : ", SystemPropertyResolver.getIntProperty("ducc.rm.prediction.fudge", 10000)); logger.info(methodName, null, " node stability : ", nodeStability); @@ -259,6 +264,8 @@ public class Scheduler logger.info(methodName, null, " RM Version : ", ""+ rmversion_major + "." + rmversion_minor + "." + rmversion_ptf); + IRmPersistence persistence = RmPersistenceFactory.getInstance(this.getClass().getName(), "RM"); + persistence.clear(); initialized = true; } @@ -794,6 +801,34 @@ public class Scheduler stability = true; } + protected void handleIllNodes() + { + String methodName = "handleIllNodes"; + + if ( ! isInitialized() ) { + logger.info(methodName, null, "Waiting for (re)initialization."); + return; + } + + HashMap<Node, Integer> nodeUpdates = new HashMap<Node, Integer>(); + synchronized(deadNodes) { + nodeUpdates.putAll(illNodes); + illNodes.clear(); + } + + synchronized(this) { + for ( Node n : nodeUpdates.keySet() ) { + Machine m = getMachine(n); + int count = nodeUpdates.get(n); + if ( count == 0 ) { + m.heartbeatArrives(); + } else { + m.heartbeatMissed(count); + } + } + } + } + protected void handleDeadNodes() { String methodName = "handleDeadNodes"; @@ -855,6 +890,7 @@ public class Scheduler // tracking the OR hang problem - are topics being delivered? logger.info("nodeArrives", null, "Total arrivals:", total_arrivals); + handleIllNodes(); handleDeadNodes(); resetNodepools(); @@ -1097,6 +1133,10 @@ public class Scheduler return; } + synchronized(illNodes) { // stop flagging it as a problem + illNodes.remove(node); + } + // String methodName = "nodeArrives"; // The first block insures the node is in the scheduler's records as soon as possible @@ -1120,6 +1160,14 @@ public class Scheduler max_order = Math.max(share_order, max_order); m = np.nodeArrives(node, share_order); // announce to the nodepools + m.heartbeatArrives(); + } + + public void nodeHb(Node n, int count) + { + synchronized(illNodes) { + illNodes.put(n, count); + } } public void nodeDeath(Map<Node, Node> nodes) Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Share.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Share.java?rev=1708149&r1=1708148&r2=1708149&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Share.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Share.java Mon Oct 12 16:10:55 2015 @@ -36,9 +36,10 @@ public class Share implements SchedConstants { //private transient DuccLogger logger = DuccLogger.getLogger(Share.class, COMPONENT_NAME); - private Machine machine; // machine associatede with this share, assigned after "what-of" + + private transient Machine machine; // machine associatede with this share, assigned after "what-of" private DuccId id = null; // unique *within this machine* assigned after "what-of" - private IRmJob job = null;; // job executing in this share, if any, assigned after "what-of" + private transient IRmJob job = null;; // job executing in this share, if any, assigned after "what-of" private DuccId bljobid = null; // UIMA-4142 ID of blacklisted job private int share_order; // may not be same as machine's order Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceManagerComponent.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceManagerComponent.java?rev=1708149&r1=1708148&r2=1708149&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceManagerComponent.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceManagerComponent.java Mon Oct 12 16:10:55 2015 @@ -38,6 +38,7 @@ import org.apache.uima.ducc.common.crypt import org.apache.uima.ducc.common.crypto.Crypto.AccessType; import org.apache.uima.ducc.common.main.DuccService; import org.apache.uima.ducc.common.persistence.services.IStateServices; +import org.apache.uima.ducc.common.persistence.services.IStateServices.SvcProps; import org.apache.uima.ducc.common.persistence.services.StateServicesDirectory; import org.apache.uima.ducc.common.persistence.services.StateServicesFactory; import org.apache.uima.ducc.common.persistence.services.StateServicesSet; @@ -116,7 +117,7 @@ public class ServiceManagerComponent private String state_file = null; private DuccProperties sm_props = null; - private String service_seqno = "service.seqno"; + private String service_seqno = SvcProps.service_seqno.pname(); private DuccIdFactory idFactory = new DuccIdFactory(); private boolean signature_required = true; @@ -177,6 +178,8 @@ public class ServiceManagerComponent continue; } + System.out.println("Meta id " + metaprops.get("meta_dbid")); + System.out.println("Svc id " + metaprops.get("svc_dbid")); DuccId id = new DuccId(friendly); id.setUUID(UUID.fromString(uuid)); logger.debug(methodName, id, "Unique:", id.getUnique()); @@ -333,6 +336,9 @@ public class ServiceManagerComponent logger.info(methodName, null, " Service ping stability : ", meta_ping_stability); logger.info(methodName, null, " Default ping class : ", default_ping_class); logger.info(methodName, null, ""); + logger.info(methodName, null, " database enabled : ", !System.getProperty("ducc.database.host").equals("--disabled--")); + logger.info(methodName, null, " database implementation : ", System.getProperty("ducc.service.persistence.impl")); + logger.info(methodName, null, ""); logger.info(methodName, null, " Init Failure Max : ", init_failure_max); logger.info(methodName, null, " Instance Failure Max : ", failure_max); logger.info(methodName, null, " Instance Failure Window : ", failure_window); @@ -821,18 +827,18 @@ public class ServiceManagerComponent props.put(UiOption.LogDirectory.pname(), logdir); DuccProperties meta = new DuccProperties(); - meta.setProperty("user", user); - meta.setProperty("instances", ""+instances); - meta.setProperty("endpoint", endpoint); - meta.setProperty("numeric_id", id.toString()); - meta.setProperty("uuid", id.getUnique()); - meta.setProperty("registration-date-millis", Long.toString(regdate)); - meta.setProperty("registration-date", regdate_readable); + meta.setProperty(SvcProps.user.pname(), user); + meta.setProperty(SvcProps.instances.pname(), ""+instances); + meta.setProperty(SvcProps.endpoint.pname(), endpoint); + meta.setProperty(SvcProps.numeric_id.pname(), id.toString()); + meta.setProperty(SvcProps.uuid.pname(), id.getUnique()); + meta.setProperty(SvcProps.registration_date_millis.pname(), Long.toString(regdate)); + meta.setProperty(SvcProps.registration_date.pname(), regdate_readable); if ( autostart == Trinary.True ) { - meta.setProperty("autostart", "true"); + meta.setProperty(SvcProps.autostart.pname(), "true"); } else { - meta.setProperty("autostart", "false"); + meta.setProperty(SvcProps.autostart.pname(), "false"); } ServiceReplyEvent reply = handler.register(id, props, meta, false); Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java?rev=1708149&r1=1708148&r2=1708149&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java Mon Oct 12 16:10:55 2015 @@ -817,8 +817,8 @@ public class ServiceSet if ( ! isRecovered ) { // if not recovery, no need to mess with the record stateHandler.storeProperties(id, job_props, meta_props); } else { - stateHandler.updateJobProperties(id, (Properties) job_props); - stateHandler.updateMetaProperties(id, meta_props); + stateHandler.updateJobProperties(meta_props.get("svc_dbid"), id, (Properties) job_props); + stateHandler.updateMetaProperties(meta_props.get("meta_dbid"), id, meta_props); } } @@ -828,7 +828,7 @@ public class ServiceSet // no more changes if ( isDeregistered() ) return; - stateHandler.updateJobProperties(id, (Properties) job_props); + stateHandler.updateJobProperties(meta_props.get("svc_dbid"), id, (Properties) job_props); } synchronized void updateMetaProperties() @@ -839,7 +839,7 @@ public class ServiceSet // if ( isDeregistered() ) return; prepareMetaProperties(); - stateHandler.updateMetaProperties(id, meta_props); + stateHandler.updateMetaProperties(meta_props.get("meta_dbid"), id, meta_props); } void prepareMetaProperties() Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/event/ServiceManagerEventListener.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/event/ServiceManagerEventListener.java?rev=1708149&r1=1708148&r2=1708149&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/event/ServiceManagerEventListener.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/event/ServiceManagerEventListener.java Mon Oct 12 16:10:55 2015 @@ -219,7 +219,7 @@ public class ServiceManagerEventListener throws Exception { String methodName = "onOrchestratorStateDuccEvent"; - // System.out.println("......... Service Manager Received OrchestratorStateDuccEvent."); + System.out.println("......... Service Manager Received OrchestratorStateDuccEvent."); // serviceManager.evaluateServiceRequirements(duccEvent.getWorkMap()); try { serviceManager.orchestratorStateArrives(duccEvent.getWorkMap()); Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-web/src/main/java/org/apache/uima/ducc/ws/server/nodeviz/NodeViz.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-web/src/main/java/org/apache/uima/ducc/ws/server/nodeviz/NodeViz.java?rev=1708149&r1=1708148&r2=1708149&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-web/src/main/java/org/apache/uima/ducc/ws/server/nodeviz/NodeViz.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-web/src/main/java/org/apache/uima/ducc/ws/server/nodeviz/NodeViz.java Mon Oct 12 16:10:55 2015 @@ -248,6 +248,8 @@ public class NodeViz Node n = r.getNode(); if ( n == null ) { logger.debug(methodName, w.getDuccId(), "Node [N/A] mem[N/A"); + } else if ( n.getNodeIdentity() == null ) { + logger.debug(methodName, w.getDuccId(), "NodeIdentity [N/A] mem[N/A"); } else { String key = strip(n.getNodeIdentity().getName()); VisualizedHost vh = hosts.get(key);