Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/services/StateServicesFactory.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/services/StateServicesFactory.java?rev=1703203&r1=1703202&r2=1703203&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/services/StateServicesFactory.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/services/StateServicesFactory.java Tue Sep 15 14:31:24 2015 @@ -18,14 +18,61 @@ */ package org.apache.uima.ducc.common.persistence.services; +import org.apache.uima.ducc.common.main.DuccService; +import org.apache.uima.ducc.common.utils.DuccLogger; -public class StateServicesFactory { - private static IStateServices instance = new StateServices(); - - public static IStateServices getInstance() { - return instance; +public class StateServicesFactory +{ + private static IStateServices instance = null; + + private static IStateServices getInstanceInternal(String callerClass, String component) + { + String methodName = "getInstance"; + // log4j logging annoyance. We require the caller to give us its base package so + // we can configure a logger that writes to the right appender + // log4j logging annoyance. We require the caller to give us its base package so + // we can configure a logger that writes to the right appender + int ndx = callerClass.lastIndexOf("."); + String stem = callerClass.substring(0, ndx); + + String clname = System.getProperty("ducc.service.persistence.impl"); + if ( clname == null ) { + DuccLogger logger = DuccService.getDuccLogger(); + logger.warn(methodName, null, "Service persistence manager is not configured. Returning null instance."); + return new NullStateServices(); + } + ndx = clname.lastIndexOf("."); + String clfile = clname.substring(ndx+1); + // + // We try to construct the persistence object. If it fails, we return a + // "null" object conforming to the interface but doing nothing to hopefully + // reduce NPEs. + // + DuccLogger logger = DuccLogger.getLogger(stem + "." + clfile, "DB"); // get the component logger + + IStateServices ret = null; + try { + @SuppressWarnings("unchecked") + Class<IStateServices> iss = (Class<IStateServices>) Class.forName(clname); + ret = (IStateServices) iss.newInstance(); + ret.init(logger); + } catch ( Throwable t ) { + logger.error(methodName, null, "Cannot instantiate service persistence class", clname, ":", t); + ret = new NullStateServices(); + } + + return ret; + } + + public static IStateServices getInstance(String callerClass, String component) + { + if ( instance == null ) { + instance = getInstanceInternal(callerClass, component); + } + + return instance; } }
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/services/StateServicesSet.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/services/StateServicesSet.java?rev=1703203&r1=1703202&r2=1703203&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/services/StateServicesSet.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/services/StateServicesSet.java Tue Sep 15 14:31:24 2015 @@ -19,17 +19,18 @@ package org.apache.uima.ducc.common.persistence.services; import java.util.HashMap; -import java.util.Properties; + +import org.apache.uima.ducc.common.utils.DuccProperties; public class StateServicesSet { - private HashMap<String,Properties> map = new HashMap<String,Properties>(); + private HashMap<String, DuccProperties> map = new HashMap<String, DuccProperties>(); - public void put(String key, Properties value) { + public void put(String key, DuccProperties value) { map.put(key, value); } - public Properties get(String key) { + public DuccProperties get(String key) { return map.get(key); } } Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/utils/DuccProperties.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/utils/DuccProperties.java?rev=1703203&r1=1703202&r2=1703203&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/utils/DuccProperties.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/utils/DuccProperties.java Tue Sep 15 14:31:24 2015 @@ -34,6 +34,25 @@ public class DuccProperties extends Prop protected boolean resolvePlaceholders = true; // Disabled by CLI for JobRequestProperties + + /** + * Null constructor now requried because we have a non-null constructor below. + */ + public DuccProperties() + { + super(); + } + + /** + * Convert a run-of-the-mill properties object into a handsome DuccProperties + */ + public DuccProperties(Properties p) + { + for ( Object k : p.keySet() ) { + put(k, p.get(k)); + } + } + public void load() throws Exception { Properties tmp = Utils.loadPropertiesFromClasspathForResource("agent"); for (Iterator<Entry<Object, Object>> it = tmp.entrySet().iterator(); it Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/test/java/org/apache/uima/ducc/common/test/cmd/StateServicesTest.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/test/java/org/apache/uima/ducc/common/test/cmd/StateServicesTest.java?rev=1703203&r1=1703202&r2=1703203&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/test/java/org/apache/uima/ducc/common/test/cmd/StateServicesTest.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/test/java/org/apache/uima/ducc/common/test/cmd/StateServicesTest.java Tue Sep 15 14:31:24 2015 @@ -18,9 +18,8 @@ */ package org.apache.uima.ducc.common.test.cmd; -import java.io.IOException; -import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import java.util.Map.Entry; import java.util.NavigableSet; import java.util.Properties; @@ -35,7 +34,7 @@ public class StateServicesTest // extends StateServices { - public static void main(String[] args) throws IOException, ClassNotFoundException { + public static void main(String[] args) throws Exception, ClassNotFoundException { String ducc_home = Utils.findDuccHome(); if(ducc_home == null) { System.out.println("DUCC_HOME not set in environment"); @@ -45,22 +44,22 @@ public class StateServicesTest System.out.println("DUCC_HOME not set in environment"); return; } - IStateServices ss = StateServicesFactory.getInstance(); - ArrayList<String> svcList = ss.getSvcList(); - for(String fname : svcList) { + IStateServices ss = StateServicesFactory.getInstance(StateServicesTest.class.getName(), "TEST"); + List<Long> svcList = ss.getSvcList(); + for(Long fname : svcList) { System.out.println(fname); } - ArrayList<String> metaList = ss.getMetaList(); - for(String fname : metaList) { + List<Long> metaList = ss.getMetaList(); + for(Long fname : metaList) { System.out.println(fname); } StateServicesDirectory ssd = ss.getStateServicesDirectory(); - NavigableSet<Integer> keySet = ssd.getDescendingKeySet(); - Iterator<Integer> iterator = keySet.iterator(); + NavigableSet<Long> keySet = ssd.getDescendingKeySet(); + Iterator<Long> iterator = keySet.iterator(); while(iterator.hasNext()) { String svc = IStateServices.svc; String meta = IStateServices.meta; - Integer i = iterator.next(); + Long i = iterator.next(); StateServicesSet sss = ssd.get(i); Properties propertiesSvc = sss.get(svc); Iterator<Entry<Object, Object>> iteratorSvc = propertiesSvc.entrySet().iterator(); Added: uima/sandbox/uima-ducc/trunk/uima-ducc-database/pom.xml URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/pom.xml?rev=1703203&view=auto ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/pom.xml (added) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/pom.xml Tue Sep 15 14:31:24 2015 @@ -0,0 +1,98 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + + <parent> + <artifactId>uima-ducc-parent</artifactId> + <groupId>org.apache.uima</groupId> + <version>2.1.0-SNAPSHOT</version> + <relativePath>../uima-ducc-parent/pom.xml</relativePath> + </parent> + + <!-- Inherits groupid and version from the parent pom project coordinates --> + <!-- Uses default packaging ie. jar --> + <artifactId>uima-ducc-database</artifactId> + <name>${uima.ducc} ${project.artifactId}</name> + + + <!-- Special inheritance note even though the <scm> element that follows + is exactly the same as those in super poms, it cannot be inherited because + there is some special code that computes the connection elements from the + chain of parent poms, if this is omitted. Keeping this a bit factored allows + cutting/pasting the <scm> element, and just changing the following two properties --> + <scm> + <connection> + scm:svn:http://svn.apache.org/repos/asf/uima/sandbox/uima-ducc/trunk/uima-ducc-database + </connection> + <developerConnection> + scm:svn:https://svn.apache.org/repos/asf/uima/sandbox/uima-ducc/trunk/uima-ducc-database + </developerConnection> + <url> + http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database + </url> + </scm> + + <dependencyManagement> + <dependencies> + </dependencies> + </dependencyManagement> + + <dependencies> + <!-- Dependencies on other DUCC projects --> + <dependency> + <groupId>org.apache.uima</groupId> + <artifactId>uima-ducc-transport</artifactId> + </dependency> + + + <dependency> + <groupId>com.orientechnologies</groupId> + <artifactId>orientdb-graphdb</artifactId> + <version>${orientdb.version}</version> + </dependency> + + <dependency> + <groupId>com.orientechnologies</groupId> + <artifactId>orientdb-lucene</artifactId> + <version>${orientdb.version}</version> + </dependency> + + <dependency> + <groupId>com.tinkerpop.blueprints</groupId> + <artifactId>blueprints-core</artifactId> + <version>2.6.0</version> + </dependency> + + <dependency> + <groupId>com.googlecode.concurrentlinkedhashmap</groupId> + <artifactId>concurrentlinkedhashmap-lru</artifactId> + <version>1.4.2</version> + </dependency> + + <dependency> + <groupId>com.tinkerpop.gremlin</groupId> + <artifactId>gremlin-groovy</artifactId> + <version>2.6.0</version> + </dependency> + </dependencies> + +</project> Added: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbConstants.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbConstants.java?rev=1703203&view=auto ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbConstants.java (added) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbConstants.java Tue Sep 15 14:31:24 2015 @@ -0,0 +1,119 @@ +package org.apache.uima.ducc.database; + +/** + * This enum defines the classes and constants used in the db schema + */ + +public interface DbConstants +{ + + static final String DUCCID = "ducc_dbid"; // DB-unique name for the duccid + static final String DUCC_DBCAT = "ducc_dbcat"; // The ducc database category: history, checkpoint, sm registry + + public interface Schema + { + String pname(); + } + + public enum DbCategory + implements Schema + { + Any { + // All categories - don't qualify the search + public String pname() { return "all"; } + }, + Checkpoint { + // OR checkpoint only + public String pname() { return "checkpoint"; } + }, + History { + // Completed and deleted stuff, all classes of objects + public String pname() { return "history"; } + }, + SmReg { + // Active service registration + public String pname() { return "smreg"; } + }, + + ; + } + + public enum DbVertex + implements Schema + { + // + // The convention is for vertices to start with Capital V and then a Capital + // + Job { // The serialized job instance from OR + public String pname() { return "VJob"; } + }, + + Classpath { + public String pname() { return "VClasspath"; } + }, + + CommandLine { + public String pname() { return "VCommandLine"; } + }, + + Driver { + public String pname() { return "VJobDriver"; } + }, + + Process { + public String pname() { return "VProcess"; } + }, + + ServiceReg { // The submitted service properties + public String pname() { return "VServiceReg"; } + }, + + ServiceMeta { // The Service metadata + public String pname() { return "VServiceMeta"; } + }, + + ServiceInstance { // The serialized service instance from OR + public String pname() { return "VServiceInstance"; } + }, + + Reservation { // The serialized reservation instance from OR + public String pname() { return "VReservation"; } + }, + + ProcessToJob { // For checkpoints, the process - to - job id map + public String pname() { return "VProcessToJob"; } + }, + + ; + + } + + public enum DbEdge + implements Schema + { + // + // The convention is for edges to start with lower e and then a lower + // + Edge { // Generic edge + public String pname() { return "ducc_edge"; } + }, + Classpath { // All record types, detached classpath + public String pname() { return "eclasspath"; } + }, + Driver { // From DuccWorkJob + public String pname() { return "edriver"; } + }, + JpProcess { // Process instance + public String pname() { return "eprocess"; } + }, + JdProcess { // Process instance + public String pname() { return "ejdprocess"; } + }, + ServiceMeta { // Setvice meta file + public String pname() { return "eservice_meta"; } + }, + + ; + + } +} Added: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java?rev=1703203&view=auto ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java (added) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java Tue Sep 15 14:31:24 2015 @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. +*/ + +package org.apache.uima.ducc.database; + +import org.apache.uima.ducc.database.DbConstants.DbEdge; +import org.apache.uima.ducc.database.DbConstants.DbVertex; + +import com.orientechnologies.orient.client.remote.OServerAdmin; +import com.orientechnologies.orient.core.metadata.schema.OProperty; +import com.orientechnologies.orient.core.metadata.schema.OType; +import com.tinkerpop.blueprints.impls.orient.OrientEdgeType; +import com.tinkerpop.blueprints.impls.orient.OrientGraphFactory; +import com.tinkerpop.blueprints.impls.orient.OrientGraphNoTx; +import com.tinkerpop.blueprints.impls.orient.OrientVertexType; + +public class DbCreate +{ + String dburl; + OServerAdmin admin; + OrientGraphFactory factory; + + public DbCreate(String dburl) + { + this.dburl = dburl; + } + + void createEdgeType(OrientGraphNoTx g, DbEdge id) + { + String s = id.pname(); + OrientEdgeType e = g.getEdgeType(s); + if ( e == null ) { + System.out.println("Create edge " + s); + g.createEdgeType(s); + } + } + + void createVertexType(OrientGraphNoTx g, DbVertex id) + { + String s = id.pname(); + OrientVertexType e = g.getVertexType(s); + if ( e == null ) { + System.out.println("Create vertex " + s); + e = g.createVertexType(s); + OProperty p = e.createProperty(DbConstants.DUCCID, OType.LONG); + p.setMandatory(true); + } + } + + void createSchema() + { + OrientGraphNoTx g = factory.getNoTx(); + + for ( DbVertex o : DbVertex.values() ) { + createVertexType(g, o); + } + for ( DbEdge o : DbEdge.values() ) { + createEdgeType(g, o); + } + + g.shutdown(); + } + + /** + * Create the database and initialize the schema. This is intended to be called only from Main at + * system startup, to insure all users of the db have a db when they start. + */ + boolean createDatabase() + throws Exception + { + String pw = DbManager.dbPassword(); + + try { + admin = new OServerAdmin(dburl); + admin.connect("root", pw); // connect to the server + + if ( ! admin.existsDatabase("plocal") ) { + System.out.println("Database " + dburl + " does not exist, attempting to create it."); + admin.createDatabase("graph", "plocal"); + + if ( ! admin.existsDatabase() ) { + System.out.println("Cannot create database " + dburl); + return false; + } + factory = new OrientGraphFactory(dburl); + if ( factory == null ) { + System.out.println("Cannot create graph factory for " + dburl); + return false; + } + + createSchema(); + } + } finally { + if ( admin != null ) admin.close(); + if ( factory != null ) factory.close(); + } + return true; + } + + public static void main(String[] args) + { + if ( args.length != 1 ) { + System.out.println("Usage: DbCreate <database url>"); + System.exit(1); + } + try { + DbCreate dbc = new DbCreate(args[0]); + if ( ! dbc.createDatabase() ) { + System.out.println("Could not create database or schema for " + args[0]); + System.exit(1); + } + } catch ( Exception e ) { + System.out.println("Errors creating database"); + e.printStackTrace(); + System.exit(1); + } + System.exit(0); + } + +} Added: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbHandle.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbHandle.java?rev=1703203&view=auto ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbHandle.java (added) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbHandle.java Tue Sep 15 14:31:24 2015 @@ -0,0 +1,490 @@ +package org.apache.uima.ducc.database; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import org.apache.uima.ducc.common.utils.DuccLogger; +import org.apache.uima.ducc.common.utils.DuccProperties; +import org.apache.uima.ducc.database.DbConstants.DbCategory; +import org.apache.uima.ducc.database.DbConstants.DbEdge; +import org.apache.uima.ducc.database.DbConstants.DbVertex; + +import com.orientechnologies.orient.core.record.impl.ODocument; +import com.orientechnologies.orient.core.sql.OCommandSQL; +import com.tinkerpop.blueprints.Direction; +import com.tinkerpop.blueprints.Edge; +import com.tinkerpop.blueprints.Vertex; +import com.tinkerpop.blueprints.impls.orient.OrientBaseGraph; +import com.tinkerpop.blueprints.impls.orient.OrientEdge; +import com.tinkerpop.blueprints.impls.orient.OrientVertex; + +public class DbHandle +{ + private DuccLogger logger = DuccLogger.getLogger(DbHandle.class, "DB"); // get the component logger + + DbManager manager; + public OrientBaseGraph graphDb; + + DbHandle(DbManager manager, OrientBaseGraph graphDb) + { + this.manager = manager; + this.graphDb = graphDb; + } + + + public void close() + { + if ( graphDb == null ) return; + graphDb.shutdown(); + graphDb = null; + } + + /** + * We use id + isHistory + vertex class = primary key + * @param id The numeric duccid of the 'thing' + * @param type the enum representing the database class of the object + * @param isHistory 'true' if we're searching history, 'false' otherwise + */ + public boolean thingInDatabase(long id, DbVertex type, DbCategory category) + throws Exception + { + String methodName = "thingInDatabase"; + Iterable<Vertex> vs = null; + if ( category == DbCategory.Any ) { + vs = select("SELECT count(*) FROM " + type.pname() + + " WHERE " + DbConstants.DUCCID + "=" + id); + } else { + vs = select("SELECT count(*) FROM " + type.pname() + + " WHERE " + DbConstants.DUCCID + "=" + id + + " AND " + DbConstants.DUCC_DBCAT + "='" + category.pname() + "'"); + } + + try { + for ( Vertex v : vs ) { + OrientVertex ov = (OrientVertex) v; + ODocument doc = ov.getRecord(); + long did = doc.field("count"); + return (did > 0); + } + } catch (Throwable e) { + logger.error(methodName, null, e); + } + return false; + } + + /** + * Find all DUCCIDs associated with the incoming type. + * + * @param type The type enum for the database class, e.g. Job + * @param isHistory 'true' to search history, 'false' otherwise + * + * @return List of human-reaable (aka "friendly" ducc ids as strings. + */ + public List<Long> listObjectsOfType(DbVertex type, DbCategory dbcat) + { + Iterable<Vertex> objs = null; + if ( dbcat == DbCategory.Any ) { + objs = graphDb.getVertices(type.pname(), + new String[] {"@class"}, + new Object[]{type.pname()}); + } else { + graphDb.getVertices(type.pname(), + new String[] {"@class", DbConstants.DUCC_DBCAT}, + new Object[]{type.pname(), dbcat.pname()}); + } + List<Long> ret = new ArrayList<Long>(); + + for ( Vertex v : objs ) { + OrientVertex ov = (OrientVertex) v; + ret.add((Long)ov.getProperty(DbConstants.DUCCID)); + } + + return ret; + } + + /** + * Convert the vertex property set to a DuccPropeties, scrubbing the + * ducc metadata. + */ + Properties vertexToProps(OrientVertex v) + { + Properties ret = new DuccProperties(); + Set<String> keys = v.getPropertyKeys(); + for (String k : keys) { + if ( k.equals(DbConstants.DUCCID) ) continue; + if ( k.equals(DbConstants.DUCC_DBCAT) ) continue; + ret.put(k, v.getProperty(k)); + } + return ret; + } + + /** + * Find the objects of the given type and return their Properties, indexed by duccid. + * + * NOTE This returns the properties as DuccProperties so you can safely cast and use the + * extended function of that class if you want. + * + * @param type The type enum for the object class (e.g. Job) + * @param isHistory 'true' to search history, 'false' otherwise + */ + public Map<Long, Properties> getPropertiesForType(DbVertex type, DbCategory dbcat) + throws Exception + { + String methodName = "getPropertiesForType"; + + Iterable<Vertex> vs = null; + if ( dbcat == DbCategory.Any ) { + vs = graphDb.getVertices(type.pname(), + new String[] {"@class"}, + new Object[]{type.pname()}); + + } else { + String a = type.pname(); + String[] b = new String[] {"@class", DbConstants.DUCC_DBCAT}; + Object[] c = new Object[]{type.pname(), dbcat.pname()}; + vs = graphDb.getVertices(a, b, c); + + + } + + Map<Long, Properties> ret = new HashMap<Long, Properties>(); + + try { + for ( Vertex v : vs ) { + OrientVertex ov = (OrientVertex) v; + Properties props = vertexToProps(ov); + Long did = ov.getProperty(DbConstants.DUCCID); + ret.put(did, props); + } + } catch (Throwable e) { + logger.error(methodName, null, "Database access error: ", e); + } + + return ret; + } + + /** + * Use this for selecting, it returns a set of stuff + */ + public Iterable<Vertex> select(String sql) + throws Exception + { + String methodName = "select"; + logger.info(methodName, null, "SQL", sql); + return graphDb.command(new OCommandSQL(sql)).execute(); + } + + /** + * Use this for just executing stuff that returns an int rc + */ + public int execute(String sql) + { + String methodName = "execute"; + logger.info(methodName, null, "SQL", sql); + return graphDb.command(new OCommandSQL(sql)).execute(); + } + + public void commit() + { + if ( graphDb != null ) graphDb.commit(); + } + + public void rollback() + { + String methodName = "rollback"; + logger.warn(methodName, null, "ROLLBACK"); + if ( graphDb != null ) graphDb. + rollback(); + } + + /** + * Delete the object of the indicated type and duccid. We optionally commit in case we want to + * do more things that have to work under the same transaction so we can rollback if needed.= + * + * Nobody is deleting; everything is moved to history. Later may be utilities to do some cleanup + * and we'll bring it back. + */ + // public void deleteObject(DbVertex type, Long duccid, boolean commit) + // throws Exception + // { + // String methodName = "deleteObject"; + // // there usually should only be ONE of these but the API is defined in terms of many + // // TODO: throw and rollback if more than one object ( I think, let's revisit this ) + // Iterable<Vertex> s = graphDb.getVertices(type.pname(), new String[] {"@class", DbConstants.DUCCID}, new Object[]{type.pname(), duccid}); + // for ( Vertex v : s ) { + // //logger.info(methodName, null, "Delete vertex for class", type, "id", duccid, "commit", commit); + // graphDb.removeVertex(v); + // } + + // if ( commit ) graphDb.commit(); + // } + + /** + * Plop the object into the DB under the class indicated by 'type', with the + * unique key 'duccid'. + * + * We use id + isHistory + vertex class = primary key and hence must insure they're always set. + * + * @prarm type The type enum of the thing to save (e.g. Job) + * @param duccid The numeric ducc id of the object + * @param obj The json-ified object to save + * @param isHistory 'true' if we save to history, 'false' otherwise + */ + public Object saveObject(DbVertex type, Long duccid, String obj, DbCategory dbcat) + { + //String methodName = "saveObject"; + + //String typename = type.pname(); + + OrientVertex ret = null; + ODocument document = null; + document = new ODocument(type.pname()); + ret = new OrientVertex(graphDb, document); + + document.fromJSON(obj); + document.field(DbConstants.DUCCID, duccid); + document.field(DbConstants.DUCC_DBCAT, dbcat.pname()); + graphDb.getRawGraph().save(document); + + return ret; + } + + + /** + * Helper class for retrieving an object and all the stuff it points to. e.g. if you want to + * reconstitue a DuccWorkJob you need to chase the edges to get the process map and the jd and + * probably other stuff. + * + * We don't care about history here because the call will have done the right search first. + * + * @param v The vertex discovered by the caller. + * @param read_all 'true' to do recursive traversal down the edges, false otherwise. + * + * NOTE: I think the db may have a primitive to do this traversal, + * @TODO must research and use it as it will likely be safer and more efficient. + * + */ + private DbObject traverseObject(OrientVertex v, boolean read_all) + { + //String methodName = "traverseObject"; + ODocument doc = v.getRecord(); + String doc_json = doc.toJSON(); + String stype = v.getProperty("@class"); + DbVertex type = manager.vertexType(stype); + + DbObject ret = new DbObject(doc_json, type); + + if ( read_all ) { + Iterable<Edge> ed = v.getEdges(Direction.OUT); + for ( Edge e : ed ) { + OrientEdge oe = (OrientEdge) e; + OrientVertex ov = oe.getVertex(Direction.IN); + //logger.info(methodName, null, "Found edge connected to vertex of type", ov.getProperty("@class"), "duccid", ov.getProperty(DUCCID)); + ret.addEmbedded(traverseObject(ov, read_all)); + } + } + return ret; + } + + /** + * Read a database object, optionally chasing edges to get all the various bits. + * + * @param type The type enum of the object, e.g. "Job" + * @param duccid The numeric id of the object + * @param read_all 'true' to recursively chase down the edges, 'false' otherwise. + */ + public DbObject readObject(DbVertex type, Long duccid, boolean read_all) + { + String methodName = "readObject"; + Iterable<Vertex> s = graphDb.getVertices(type.pname(), new String[] {"@class", DbConstants.DUCCID}, new Object[]{type.pname(), duccid}); + int count = 0; + OrientVertex fv = null; + for (Vertex v : s) { + fv = (OrientVertex) v; + //logger.info(methodName, null, "from vertex", count, fv.getIdentity()); + count++; + } + if ( count > 1 ) { + logger.error(methodName, null, "Expected unique object, found", count, "objects. Returning only the first object."); + } + + //logger.info(methodName, null, "Traversing", fv.getProperty("@class"), "duccid", fv.getProperty(DUCCID)); + return traverseObject(fv, read_all); + } + + /** + * Link the 'from' to all the 'to''s' with edges of the right type. Everything is expected + * to be in the db already, but maybe not yet committed, according to the caller. + * + * @param from the source object + * @param to a list of the things the source is pointing/linking to + * @param type The type enum for edge, e.g. 'process' + */ + public void addEdges(Object from, List<Object> to, DbEdge type) + { + //String methodName = "addEdges"; + OrientVertex fv = (OrientVertex) from; + String etype = type.pname(); + + for ( Object o : to ) { + OrientVertex tv = (OrientVertex) o; + fv.addEdge(etype, tv); + } + + } + + + /** + * Link the 'from' to a single 'to' with an e right type. Everything is expected + * to be in the db already, but maybe not yet committed, according to the caller. + * + * @param from the source object + * @param a single object to point to + * @param type The type enum for edge, e.g. 'process' + */ + public void addEdge(Object from, Object to, DbEdge type) + { + //String methodName = "addEdges"; + OrientVertex fv = (OrientVertex) from; + OrientVertex tv = (OrientVertex) to; + String etype = type.pname(); + + fv.addEdge(etype, tv); + } + + // public void addEdges(DbVertex typeFrom, Long fid, DbVertex typeTo, List<Long> tids) + // { + // String methodName = "addEdges"; + + // Iterable<Vertex> s = graphDb.getVertices(typeFrom.pname(), new String[] {"@class", DUCCID}, new Object[]{typeFrom.pname(), fid}); + // int count = 0; + // OrientVertex fv = null; + // for (Vertex v : s) { + // fv = (OrientVertex) v; + // //logger.info(methodName, null, "from vertex", count, fv.getIdentity()); + // count++; + // } + + // for ( Long tid : tids ) { + // Iterable<Vertex> ss = graphDb.getVertices(typeTo.pname(), new String[] {"@class", DUCCID}, new Object[]{typeTo.pname(), tid}); + // count = 0; + // for (Vertex v : ss) { + // OrientVertex tv = (OrientVertex) v; + // // logger.info(methodName, null, "to vertex", count, tv.getIdentity()); + // count++; + + // fv.addEdge(DbEdge.Edge.pname(), tv); + // } + // } + + + // } + + // public void addEdge(DbVertex typeFrom, Long fid, DbVertex typeTo, Long tid) + // { + // String methodName = "addEdge"; + + // Iterable<Vertex> s = graphDb.getVertices(typeFrom.pname(), new String[] {"@class", DUCCID}, new Object[]{typeFrom.pname(), fid}); + // int count = 0; + // OrientVertex fv = null; + // for (Vertex v : s) { + // fv = (OrientVertex) v; + // //logger.info(methodName, null, "from vertex", count, fv.getIdentity()); + // count++; + // } + + // Iterable<Vertex> ss = graphDb.getVertices(typeTo.pname(), new String[] {"@class", DUCCID}, new Object[]{typeTo.pname(), tid}); + // count = 0; + // for (Vertex v : ss) { + // OrientVertex tv = (OrientVertex) v; + // // logger.info(methodName, null, "to vertex", count, tv.getIdentity()); + // count++; + + // fv.addEdge(DbEdge.Edge.pname(), tv); + // } + + + // } + + /** + * Create an object in the db from a properties object. + * + * @param props The properties object to be placed in the db. + * @param type The type enum for the object, e.g. "Service" + * @param duccid The numeric id of the object + * @param isHistory 'True' if it is to be placed in history, 'false' otherwise. + */ + public Object createPropertiesObject(Properties props, DbVertex type, Long duccid, DbCategory dbcat) + { + // Note: caller must insure this is first time for this if he doesn't want a duplicate. + // by calling thingInDatabase(). + + //String methodName = "createPropertiesObject"; + String typeName = type.pname(); + + OrientVertex ov = null; + + // logger.info(methodName, null, duccid, "Create new db record of type", typeName); + ov = graphDb.addVertex("class:" + typeName); + ov.setProperties(DbConstants.DUCCID, duccid); + ov.setProperties(props); + ov.setProperty(DbConstants.DUCC_DBCAT, dbcat.pname()); + return ov; + } + + /** + * Use the incoming properties to set the properties on the object of given type and duccid. + * Rules: + * 1. If the object does not exist in the db, add it with no properties. + * 2. If the property exists in both, update the value. + * 3. If the property exists only in db object, delete from the db object. + * 4. If the property exists only in input, add to db object. + * 5. Caller must commit, allowing for multiple things in a transaction + * @param props The propertiess to sync with + * @param type The type of object to update (e.g. Service, ServiceMeta, Job etc) + * @param duccid the duccid of the object + * @param isHistory 'True' if the object is to be placed in history, 'false' otherwise + */ + public void syncProperties(Properties props, DbVertex type, Long duccid, DbCategory dbcat) + throws Exception + { + //String methodName = "syncProperties"; + + // The assumption is that only one object of the given DbVertex.type and duccid is allowed in the + // database. + Iterable<Vertex> s = graphDb.getVertices(type.pname(), new String[] {"@class", DbConstants.DUCCID}, new Object[]{type.pname(), duccid}); + + OrientVertex ov = null; + int count = 0; // some sanity checking, we're not allowed more than one + for (Vertex v : s) { + ov = (OrientVertex) v; + count++; + } + + if ( count > 1 ) { + throw new IllegalStateException("Multiple database records for " + type + "." + duccid); + } + + if ( count == 0 ) { + throw new IllegalStateException("No record found to update for " + type + "." + duccid); + } + + //logger.info(methodName, null, duccid, "Update record of type", type); + Set<String> keys = ov.getPropertyKeys(); + for (String k : keys) { // (clear a property according to rule 3 above) + if ( ! k.equals(DbConstants.DUCCID) && !props.containsKey(k) ) { + ov.removeProperty(k); + } + } + ov.setProperties(props); // handles both rules 2 and 4 + ov.setProperty(DbConstants.DUCC_DBCAT, dbcat.pname()); + //graphDb.getRawGraph().save(ov.getRecord()); + ov.save(); + + } + +} Added: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbLoader.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbLoader.java?rev=1703203&view=auto ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbLoader.java (added) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbLoader.java Tue Sep 15 14:31:24 2015 @@ -0,0 +1,597 @@ +package org.apache.uima.ducc.database; + +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.uima.ducc.common.persistence.services.IStateServices; +import org.apache.uima.ducc.common.utils.DuccLogger; +import org.apache.uima.ducc.common.utils.id.DuccId; +import org.apache.uima.ducc.transport.event.common.IDuccWorkJob; +import org.apache.uima.ducc.transport.event.common.IDuccWorkReservation; +import org.apache.uima.ducc.transport.event.common.IDuccWorkService; + +/** + * Toy orientdb loader to load a historydb from ducc history + */ + +public class DbLoader +{ + DuccLogger logger = DuccLogger.getLogger(DbLoader.class, "DBLOAD"); + + HistoryManagerDb hmd = null; + StateServicesDb ssd = null; + + // String history_url = "remote:localhost/DuccHistory"; + String state_url = "remote:localhost/DuccState"; + + // String jobHistory = System.getProperty("user.home") + "/ducc_runtime/history/jobs"; + String jobHistory = "/home/ducc/ducc_runtime/history/jobs"; + + // String reservationHistory = System.getProperty("user.home") + "/ducc_runtime/history/reservations"; + String reservationHistory = "/home/ducc/ducc_runtime/history/reservations"; + + //String serviceHistory = System.getProperty("user.home") + "/ducc_runtime/history/services"; + String serviceHistory = "/home/ducc/ducc_runtime/history/services"; + + //String serviceHistory = System.getProperty("user.home") + "/ducc_runtime/history/services"; + String serviceRegistryHistory = "/home/ducc/ducc_runtime/history/services-registry"; + + //String serviceRegistry = System.getProperty("user.home") + "/ducc_runtime/state/services"; + String serviceRegistry = "/home/ducc/ducc_runtime/state/services"; + + int nthreads = 40; + AtomicInteger counter = new AtomicInteger(0); + + public DbLoader() + throws Exception + { + // System.setProperty("ducc.history.database.url", history_url); + System.setProperty("ducc.state.database.url", state_url); + } + + void closeStream(InputStream in) + { + try { in.close(); } catch(Exception e) {} + } + + public void loadJobs() + { + String methodName = "loadJobs"; + LinkedBlockingQueue<File> jobqueue = new LinkedBlockingQueue<File>(); + + int max_to_load = Integer.MAX_VALUE; // or Integer.MAX_VALUE for 'all of them' + int nth = Math.min(nthreads, max_to_load); + JobLoader[] loader = new JobLoader[nth]; + Thread[] threads = new Thread[nth]; + List<Long> ids = new ArrayList<Long>(); + + for ( int i = 0; i < nth; i++ ) { + loader[i] = new JobLoader(jobqueue, ids); + threads[i] = new Thread(loader[i]); + threads[i].start(); + } + + File dir = new File(jobHistory); + File[] files = dir.listFiles(); + logger.info(methodName, null, "Reading", files.length, "jobs."); + + int c = 0; + for ( File f : files) { + String s = f.toString(); + if ( s.endsWith(".dwj") ) { + logger.info(methodName, null, "Loading file", c++, ":", f); + jobqueue.offer(f); + counter.getAndIncrement(); + + if ( c >= max_to_load ) break; + } else { + logger.info(methodName, null, "Can't find history file", f); + } + } + + while ( (c = counter.get()) != 0 ) { + try { + logger.info(methodName, null, "Waiting for loads to finish, counter is", c, "(job)."); + Thread.sleep(1000); + } + catch ( Exception e ) {} + } + + for ( int i = 0; i < nth; i++ ) { + logger.info(methodName, null, "Intterupt thread (job)", i); + threads[i].interrupt(); + } + + for ( int i = 0; i < nth; i++ ) { + logger.info(methodName, null, "Joining thread (job)", i); + try { threads[i].join(); } catch ( InterruptedException e ) {} + } + + } + + public void loadReservations() + { + String methodName = "loadReservations"; + + LinkedBlockingQueue<IDuccWorkReservation> queue = new LinkedBlockingQueue<IDuccWorkReservation>(); + + int max_to_load = Integer.MAX_VALUE; + int nth = Math.min(nthreads, max_to_load); + ReservationLoader[] loader = new ReservationLoader[nth]; + Thread[] threads = new Thread[nth]; + ArrayList<Long> ids = new ArrayList<Long>(); + + for ( int i = 0; i < nth; i++ ) { + loader[i] = new ReservationLoader(queue, ids); + threads[i] = new Thread(loader[i]); + threads[i].start(); + } + + File dir = new File(reservationHistory); + int c = 0; + + File[] files = dir.listFiles(); + logger.info(methodName, null, "Reading", files.length, "reservation instances."); + for ( File f : dir.listFiles() ) { + String s = f.toString(); + if ( s.endsWith(".dwr") ) { + logger.info(methodName, null, "Loading file", c++, ":", f); + IDuccWorkReservation res = null; + FileInputStream fis = null; + ObjectInputStream in = null; + + try { + long now = System.currentTimeMillis(); + fis = new FileInputStream(f); + in = new ObjectInputStream(fis); + res = (IDuccWorkReservation) in.readObject(); + logger.info(methodName, res.getDuccId(), "Time to read reservation:", System.currentTimeMillis() - now); + + queue.offer(res); + counter.getAndIncrement(); + } catch(Exception e) { + logger.info(methodName, null, e); + } finally { + // restoreJob(job.getDuccId().getFriendly()); + closeStream(in); + closeStream(fis); + if ( c >= max_to_load ) break; + } + } else { + logger.info(methodName, null, "Can't find history file", f); + } + + } + + while ( (c = counter.get()) != 0 ) { + try { + logger.info(methodName, null, "Waiting for reservation loads to finish, counter is", c); + Thread.sleep(1000); + } + catch ( Exception e ) {} + } + + for ( int i = 0; i < nth; i++ ) { + logger.info(methodName, null, "Intterupt thread (reservations).", i); + threads[i].interrupt(); + } + + for ( int i = 0; i < nth; i++ ) { + logger.info(methodName, null, "Joining thread (reservations).", i); + try { threads[i].join(); } catch ( InterruptedException e ) {} + } + + // try { + // List<IDuccWorkReservation> ress = hmd.restoreReservations(c); + // logger.info(methodName, null, "Recovered", ress.size(), "reservations."); + // } catch (Exception e) { + // // TODO Auto-generated catch block + // e.printStackTrace(); + // } + } + + + public void loadServices() + { + String methodName = "loadServices"; + LinkedBlockingQueue<IDuccWorkService> queue = new LinkedBlockingQueue<IDuccWorkService>(); + + int max_to_load = Integer.MAX_VALUE; + int nth = Math.min(nthreads, max_to_load); + ServiceLoader[] loader = new ServiceLoader[nth]; + Thread[] threads = new Thread[nth]; + ArrayList<Long> ids = new ArrayList<Long>(); + + for ( int i = 0; i < nth; i++ ) { + loader[i] = new ServiceLoader(queue, ids); + threads[i] = new Thread(loader[i]); + threads[i].start(); + } + + File dir = new File(serviceHistory); + int c = 0; + + File[] files = dir.listFiles(); + logger.info(methodName, null, "Reading", files.length, "service instances."); + for ( File f : files ) { + String s = f.toString(); + if ( s.endsWith(".dws") ) { + logger.info(methodName, null, "Loading file", c++, ":", f); + IDuccWorkService svc = null; + FileInputStream fis = null; + ObjectInputStream in = null; + + try { + long now = System.currentTimeMillis(); + fis = new FileInputStream(f); + in = new ObjectInputStream(fis); + svc = (IDuccWorkService) in.readObject(); + logger.info(methodName, svc.getDuccId(), "Time to read service:", System.currentTimeMillis() - now); + + + queue.offer(svc); + counter.getAndIncrement(); + } catch(Exception e) { + logger.info(methodName, null, e); + } finally { + // restoreJob(job.getDuccId().getFriendly()); + closeStream(in); + closeStream(fis); + if ( c >= max_to_load ) break; + } + } else { + logger.info(methodName, null, "Can't find history file", f); + } + } + + while ( (c = counter.get()) != 0 ) { + try { + logger.info(methodName, null, "Waiting for service loads to finish, counter is", c); + Thread.sleep(1000); + } + catch ( Exception e ) {} + } + + for ( int i = 0; i < nth; i++ ) { + logger.info(methodName, null, "Intterupt thread (services).", i); + threads[i].interrupt(); + } + + for ( int i = 0; i < nth; i++ ) { + logger.info(methodName, null, "Joining thread (services).", i); + try { threads[i].join(); } catch ( InterruptedException e ) {} + } + + // try { + // List<IDuccWorkService> services = hmd.restoreServices(c); + // logger.info(methodName, null, "Recovered", services.size(), "serves."); + // } catch (Exception e) { + // // TODO Auto-generated catch block + // e.printStackTrace(); + // } + + } + + public void loadServiceRegistry(String registry) + { + String methodName = "loadServiceRegistry"; + + LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<String>(); + + int max_to_load = Integer.MAX_VALUE; + int nth = Math.min(nthreads, max_to_load); + ServiceRegistrationLoader[] loader = new ServiceRegistrationLoader[nth]; + Thread[] threads = new Thread[nth]; + ArrayList<Long> ids = new ArrayList<Long>(); + + for ( int i = 0; i < nth; i++ ) { + loader[i] = new ServiceRegistrationLoader(queue, ids); + threads[i] = new Thread(loader[i]); + threads[i].start(); + } + + int c = 0; + File dir = new File(registry); + File[] files = dir.listFiles(); + logger.info(methodName, null, "Reading", files.length, "service files (2 per instance)."); + for ( File f : files ) { + String s = f.toString(); + if ( s.endsWith(".svc") ) { + int ndx = s.indexOf(".svc"); + String numeric = s.substring(0, ndx); + queue.offer(numeric); + counter.getAndIncrement(); + + if ( ++c >= max_to_load ) break; + } + + } + + while ( (c = counter.get()) != 0 ) { + try { + logger.info(methodName, null, "Waiting for service registry loads to finish, counter is", c); + Thread.sleep(1000); + } + catch ( Exception e ) {} + } + + for ( int i = 0; i < nth; i++ ) { + logger.info(methodName, null, "Intterupt thread (service registry).", i); + threads[i].interrupt(); + } + + for ( int i = 0; i < nth; i++ ) { + logger.info(methodName, null, "Joining thread (service registry).", i); + try { threads[i].join(); } catch ( InterruptedException e ) {} + } + + } + + void run() + throws Exception + { + String methodName = "run"; + + DbCreate cr = new DbCreate(state_url); + cr.createDatabase(); + + // load the history db + if ( true ) { + try { + + hmd = new HistoryManagerDb(logger); + + // ---------- Load job history + loadJobs(); + if ( true ) return; + + // ---------- Load reservation history + loadReservations(); + + // ---------- Load service isntance and AP history + loadServices(); + + // ---------- Load service registry + ssd = new StateServicesDb(); + ssd.init(logger); + loadServiceRegistry(serviceRegistry); + try { + ssd.shutdown(); + } catch ( Exception e ) { + e.printStackTrace(); + } + + // ---------- Load service registry history + ssd = new StateServicesDb(); + ssd.init(logger); + loadServiceRegistry(serviceRegistryHistory); + } catch ( Exception e ) { + logger.error(methodName, null, e); + + } finally { + if ( hmd != null ) hmd.shutdown(); + if ( ssd != null ) ssd.shutdown(); + } + } + + // load the service registry + // loadServiceRegistry(); + } + + public static void main(String[] args) + { + DbLoader dbl = null; + try { + dbl = new DbLoader(); + dbl.run(); + } catch ( Exception e ) { + e.printStackTrace(); + } + } + + class JobLoader + implements Runnable + { + BlockingQueue<File> queue; + List<Long> ids; + JobLoader(BlockingQueue<File> queue, List<Long> ids) + { + this.queue = queue; + this.ids = ids; + } + + public void run() + { + String methodName = "JobLoader.run"; + while ( true ) { + File f = null; + IDuccWorkJob job = null; + try { + // logger.info(methodName, null, "About to take (job)."); + f = queue.take(); + + FileInputStream fis = null; + ObjectInputStream in = null; + + try { + long now = System.currentTimeMillis(); + fis = new FileInputStream(f); + in = new ObjectInputStream(fis); + job = (IDuccWorkJob) in.readObject(); + logger.info(methodName, job.getDuccId(), "Time to read job:", System.currentTimeMillis() - now); + } catch(Exception e) { + logger.info(methodName, null, e); + } finally { + // restoreJob(job.getDuccId().getFriendly()); + closeStream(in); + closeStream(fis); + } + hmd.saveJobUnsafe(job); + + } catch ( InterruptedException e ) { + return; + } catch(Exception e) { + logger.info(methodName, null, e); + } + + synchronized(ids) { + ids.add(job.getDuccId().getFriendly()); + } + counter.getAndDecrement(); + } + } + } + + + class ServiceLoader + implements Runnable + { + BlockingQueue<IDuccWorkService> queue; + List<Long> ids; + ServiceLoader(BlockingQueue<IDuccWorkService> queue, List<Long> ids) + { + this.queue = queue; + this.ids = ids; + } + + public void run() + { + String methodName = "ServiceLoader.run"; + while ( true ) { + IDuccWorkService svc = null; + try { + logger.info(methodName, null, "About to take (service)."); + svc = queue.take(); + } catch ( InterruptedException e ) { + return; + } + logger.info(methodName, svc.getDuccId(), "Took a Service"); + try { + //h = dbManager.open(); + hmd.saveServiceUnsafe(svc); + //h.close(); + synchronized(ids) { + ids.add(svc.getDuccId().getFriendly()); + } + counter.getAndDecrement(); + } catch(Exception e) { + logger.info(methodName, null, e); + } + } + } + } + + class ReservationLoader + implements Runnable + { + BlockingQueue<IDuccWorkReservation> queue; + List<Long> ids; + ReservationLoader(BlockingQueue<IDuccWorkReservation> queue, List<Long> ids) + { + this.queue = queue; + this.ids = ids; + } + + public void run() + { + String methodName = "ReservationLoader.run"; + while ( true ) { + IDuccWorkReservation res = null; + try { + logger.info(methodName, null, "About to take (reservation)."); + res = queue.take(); + } catch ( InterruptedException e ) { + return; + } + logger.info(methodName, res.getDuccId(), "Took a Service"); + try { + //h = dbManager.open(); + hmd.saveReservationUnsafe(res); + //h.close(); + synchronized(ids) { + ids.add(res.getDuccId().getFriendly()); + } + counter.getAndDecrement(); + } catch(Exception e) { + logger.info(methodName, null, e); + } + } + } + } + + + class ServiceRegistrationLoader + implements Runnable + { + BlockingQueue<String> queue; + List<Long> ids; + ServiceRegistrationLoader(BlockingQueue<String> queue, List<Long> ids) + { + this.queue = queue; + this.ids = ids; + } + + public void run() + { + String methodName = "ServiceRegistrationLoader.run"; + while ( true ) { + String id = null; + try { + logger.info(methodName, null, "About to take (service id)."); + id = queue.take(); + } catch ( InterruptedException e ) { + return; + } + logger.info(methodName, null, id, "Took a service id"); + FileInputStream svc_in = null; + FileInputStream meta_in = null; + try { + Properties svc_props = new Properties(); + Properties meta_props = new Properties(); + + String svc_name = id + ".svc"; + String meta_name = id + ".meta"; + + svc_in = new FileInputStream(svc_name); + meta_in = new FileInputStream(meta_name); + svc_props.load(svc_in); + meta_props.load(meta_in); + String sid = meta_props.getProperty(IStateServices.SvcProps.numeric_id.pname()); + if ( sid == null ) { + logger.warn(methodName, null, "Cannot find service id in meta file for", id); + } else { + if ( id.indexOf(sid) < 0 ) { + // must do index of because the 'id' is a full path, not just the numeric id. so we + // are satisfied with making sure the id is in the path. + throw new IllegalStateException("Service id and internal id do not match."); + } + DuccId did = new DuccId(Long.parseLong(sid)); + + ssd.storePropertiesUnsafe(did, svc_props, meta_props); + + synchronized(ids) { + ids.add(did.getFriendly()); + } + } + counter.getAndDecrement(); + } catch(Exception e) { + logger.info(methodName, null, e); + } finally { + closeStream(svc_in); + closeStream(meta_in); + } + + } + } + } + + +} Added: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbManager.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbManager.java?rev=1703203&view=auto ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbManager.java (added) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbManager.java Tue Sep 15 14:31:24 2015 @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. +*/ + +package org.apache.uima.ducc.database; + +import java.io.FileInputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import org.apache.uima.ducc.common.utils.DuccLogger; +import org.apache.uima.ducc.database.DbConstants.DbEdge; +import org.apache.uima.ducc.database.DbConstants.DbVertex; + +import com.orientechnologies.orient.client.remote.OServerAdmin; +import com.tinkerpop.blueprints.impls.orient.OrientGraph; +import com.tinkerpop.blueprints.impls.orient.OrientGraphFactory; +import com.tinkerpop.blueprints.impls.orient.OrientGraphNoTx; + +public class DbManager +{ + + String dburl; + DuccLogger logger; + + OrientGraphFactory factory; + + Map<String, DbVertex> name_to_vertex = new HashMap<String, DbVertex>(); + Map<String, DbEdge> name_to_edge = new HashMap<String, DbEdge>(); + + // private ODatabaseDocumentTx documentDb; + + public DbManager(String dburl, DuccLogger logger) + throws Exception + { + this.dburl = dburl; + this.logger = logger; + + for ( DbVertex o : DbVertex.values() ) { + name_to_vertex.put(o.pname(), o); + } + for ( DbEdge o : DbEdge.values() ) { + name_to_edge.put(o.pname(), o); + } + + } + + DbVertex vertexType(String v) + { + return name_to_vertex.get(v); + } + + DbEdge EdgeType(String e) + { + return name_to_edge.get(e); + } + + + boolean checkForDatabase() + throws Exception + { + String methodName = "checkForDatabase"; + String pw = dbPassword(); + OServerAdmin admin = null; + + boolean ret = true; + try { + admin = new OServerAdmin(dburl); + admin.connect("root", pw); // connect to the server + + if ( ! admin.existsDatabase("plocal") ) { + logger.info(methodName, null, "Database " + dburl + " does not exist."); + ret = false; + } + } finally { + if ( admin != null) admin.close(); + } + return ret; + } + + public synchronized DbHandle open() + throws Exception + { + OrientGraph graphDb = factory.getTx(); // the graph instance + if ( graphDb == null ) { + throw new IllegalStateException("Cannot allocate graph instance for " + dburl); + } + + return new DbHandle(this, graphDb); + } + + + public synchronized DbHandle openNoTx() + throws Exception + { + OrientGraphNoTx graphDb = factory.getNoTx(); // the graph instance + if ( graphDb == null ) { + throw new IllegalStateException("Cannot allocate graph instance for " + dburl); + } + + return new DbHandle(this, graphDb); + } + + + public synchronized void init() + throws Exception + { + String methodName = "init"; + + if ( factory != null ) return; // already initialized + + if ( ! dburl.startsWith("plocal") ) { + // make sure the server is up if it's not a plocal db + if ( !checkForDatabase() ) { + throw new IllegalStateException("Database does not exist and must be created:" + dburl); + } + } + + factory = new OrientGraphFactory(dburl); + if ( factory == null ) { + throw new IllegalStateException("Cannot create graph factory for " + dburl); + } + logger.info(methodName, null, "Database is opened:", dburl); + factory.setupPool(1,20); + } + + public synchronized void shutdown() + { + String methodName = "closeDatabase"; + logger.info(methodName, null, "Closing the database."); + if ( factory != null ) { + // closes all pooled instances and stops the factory + factory.close(); + factory = null; + } + } + + + static String dbPassword() + throws Exception + { + // logger.info(methodName, null, "Opening service database at: " + dburl); + Properties props = new Properties(); + FileInputStream fis = new FileInputStream(System.getProperty("DUCC_HOME") + "/resources.private/db_password"); + props.load(fis); + fis.close(); + + String pw = props.getProperty("db_password"); + if ( pw == null ) { + throw new IllegalStateException("Cannot acquire the database password."); + } + return pw; + } + + + public static void main(String[] args) + { + } + +} Added: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbObject.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbObject.java?rev=1703203&view=auto ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbObject.java (added) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbObject.java Tue Sep 15 14:31:24 2015 @@ -0,0 +1,33 @@ +package org.apache.uima.ducc.database; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.uima.ducc.database.DbConstants.DbVertex; + + +/** + * Simple holder class to return stuff put into the database. + */ +public class DbObject +{ + String json; + DbVertex type; + List<DbObject> embedded; + + DbObject(String json, DbVertex type) + { + embedded = new ArrayList<DbObject>(); + this.json = json; + this.type = type; + } + + void addEmbedded(DbObject obj) + { + embedded.add(obj); + } + + public String getJson() { return json; } + public DbVertex getType() { return type; } + public List<DbObject> getEmbedded() { return embedded; } +}