Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbManager.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbManager.java?rev=1711088&r1=1711087&r2=1711088&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbManager.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbManager.java Wed Oct 28 18:12:53 2015 @@ -20,180 +20,134 @@ package org.apache.uima.ducc.database; import java.io.FileInputStream; -import java.util.HashMap; -import java.util.Map; import java.util.Properties; import org.apache.uima.ducc.common.utils.DuccLogger; -import org.apache.uima.ducc.database.DbConstants.DbEdge; -import org.apache.uima.ducc.database.DbConstants.DbVertex; - -import com.orientechnologies.orient.client.remote.OServerAdmin; -import com.orientechnologies.orient.core.intent.OIntent; -import com.tinkerpop.blueprints.impls.orient.OrientGraph; -import com.tinkerpop.blueprints.impls.orient.OrientGraphFactory; -import com.tinkerpop.blueprints.impls.orient.OrientGraphNoTx; +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Host; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.SimpleStatement; + +/** + * Provide a common point for contacting the db, acquiring sessions/handles to it, managing the db, + * closing, etc. + */ public class DbManager { String dburl; DuccLogger logger; - OrientGraphFactory factory; - - Map<String, DbVertex> name_to_vertex = new HashMap<String, DbVertex>(); - Map<String, DbEdge> name_to_edge = new HashMap<String, DbEdge>(); - - // private ODatabaseDocumentTx documentDb; + private Cluster cluster; // only one + private Session session; // only one - it's thread safe and manages a connection pool public DbManager(String dburl, DuccLogger logger) throws Exception { this.dburl = dburl; this.logger = logger; - - for ( DbVertex o : DbVertex.values() ) { - name_to_vertex.put(o.pname(), o); - } - for ( DbEdge o : DbEdge.values() ) { - name_to_edge.put(o.pname(), o); - } - } - - DbVertex vertexType(String v) - { - return name_to_vertex.get(v); - } - - DbEdge EdgeType(String e) - { - return name_to_edge.get(e); - } - boolean checkForDatabase() throws Exception { String methodName = "checkForDatabase"; - String pw = dbPassword(); - OServerAdmin admin = null; - - boolean ret = true; - try { - admin = new OServerAdmin(dburl); - admin.connect("root", pw); // connect to the server - - if ( ! admin.existsDatabase("plocal") ) { - logger.info(methodName, null, "Database " + dburl + " does not exist."); - ret = false; - } - } finally { - if ( admin != null) admin.close(); - } - return ret; + logger.warn(methodName, null, "Not yet implemented."); + return true; } public void drop() throws Exception { - if ( dburl.startsWith("remote") ) { - OServerAdmin admin = null; - try { - String pw = dbPassword(); - admin = new OServerAdmin(dburl); - admin.connect("root", pw); // connect to the server - admin.dropDatabase("plocal"); - admin.close(); - } finally { - if ( admin != null ) admin.close(); - } - } else { - OrientGraphNoTx graphDb = factory.getNoTx(); // the graph instance - if ( graphDb == null ) { - throw new IllegalStateException("Cannot allocate graph instance for " + dburl); - } - graphDb.drop(); - } + String methodName = "drop"; + logger.warn(methodName, null, "Drop is not implemented yet."); } public synchronized DbHandle open() throws Exception { - OrientGraph graphDb = factory.getTx(); // the graph instance - if ( graphDb == null ) { - throw new IllegalStateException("Cannot allocate graph instance for " + dburl); + if ( session == null ) { + session = cluster.connect(); } - - graphDb.setUseLightweightEdges(true); - return new DbHandle(this, graphDb); + + return new DbHandle(this); } - public synchronized DbHandle openNoLog() - throws Exception + public synchronized void init() + throws Exception { - OrientGraph graphDb = factory.getTx(); // the graph instance - if ( graphDb == null ) { - throw new IllegalStateException("Cannot allocate graph instance for " + dburl); - } + String methodName = "init"; + + if ( cluster != null ) return; // already initialized + + cluster = Cluster.builder() + .addContactPoint(dburl) + .build(); + + Metadata metadata = cluster.getMetadata(); + logger.info(methodName, null, "Connected to cluster: %s\n", metadata.getClusterName()); - graphDb.setUseLightweightEdges(true); - graphDb.getRawGraph().getTransaction().setUsingLog(false); - return new DbHandle(this, graphDb); + for ( Host host : metadata.getAllHosts() ) { + logger.info(methodName, null, "Datatacenter: %s; Host: %s; Rack: %s\n", host.getDatacenter(), host.getAddress(), host.getRack()); + } } - - public synchronized DbHandle openNoTx() - throws Exception + public synchronized void shutdown() { - OrientGraphNoTx graphDb = factory.getNoTx(); // the graph instance - if ( graphDb == null ) { - throw new IllegalStateException("Cannot allocate graph instance for " + dburl); - } - graphDb.setUseLightweightEdges(true); - return new DbHandle(this, graphDb); + String methodName = "closeDatabase"; + logger.info(methodName, null, "Closing the database."); + cluster.close(); + cluster = null; + session = null; } - - public synchronized void init() - throws Exception + PreparedStatement prepare(String cql) { - String methodName = "init"; + return session.prepare(cql); + } - if ( factory != null ) return; // already initialized + void truncate(String table) + throws Exception + { + execute("TRUNCATE " + table); + } - if ( ! dburl.startsWith("plocal") ) { - // make sure the server is up if it's not a plocal db - if ( !checkForDatabase() ) { - throw new IllegalStateException("Database does not exist and must be created:" + dburl); - } + String truncateText(String s) + { + String ret = s; + if ( ret.length() > 200 ) { + ret = s.substring(0, 200) + " ... "; } - - factory = new OrientGraphFactory(dburl); - if ( factory == null ) { - throw new IllegalStateException("Cannot create graph factory for " + dburl); - } - logger.info(methodName, null, "Database is opened:", dburl); - factory.setupPool(1,20); + return ret; } - public synchronized void declareIntent(OIntent intent) + ResultSet execute(String cql) { - factory.declareIntent(intent); + String methodName = "execute"; + if ( logger.isDebug() ) { + logger.info(methodName, null, "EXECUTE CQL:", cql); + } else { + logger.info(methodName, null, "EXECUTE CQL:", truncateText(cql)); + } + return session.execute(cql); } - public synchronized void shutdown() + ResultSet execute(BoundStatement s) { - String methodName = "closeDatabase"; - logger.info(methodName, null, "Closing the database."); - if ( factory != null ) { - // closes all pooled instances and stops the factory - factory.close(); - factory = null; - } + return session.execute(s); } + ResultSet execute(SimpleStatement s) + { + String methodName = "execute"; + logger.info(methodName, null, "EXECUTE STATEMENT:", truncateText(s.getQueryString())); + return session.execute(s); + } static String dbPassword() throws Exception
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbUtil.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbUtil.java?rev=1711088&view=auto ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbUtil.java (added) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbUtil.java Wed Oct 28 18:12:53 2015 @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. +*/ + +package org.apache.uima.ducc.database; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.uima.ducc.common.persistence.IDbProperty; +import org.apache.uima.ducc.common.persistence.IDbProperty.Type; + +/** + * Static common helper methods. + * + * Not public at this point, would prefer to encapsulate all this entirely in DB. + */ +class DbUtil +{ + static String mkSchema(IDbProperty[] props) + throws Exception + { + List<String> parts = new ArrayList<String>(); + List<String> primaries = new ArrayList<String>(); + for (IDbProperty n : props ) { + if ( n.isMeta() ) continue; + String s = n.columnName() + " " + typeToString(n.type()); + if ( n.isPrimaryKey() ) { + primaries.add(n.columnName()); + } + parts.add(s); + } + if ( primaries.size() == 0 ) { + throw new IllegalArgumentException("Schema properties must declare at least one primary key."); + } + StringBuffer buf = new StringBuffer(); + for ( String p : parts ) { + buf.append(p); + buf.append(","); + } + int ncommas = primaries.size() - 1; + int c = 0; + buf.append(" PRIMARY KEY("); + for ( String s : primaries ) { + buf.append(s); + if ( c++ < ncommas ) { + buf.append(","); + } + } + + buf.append(")"); + return buf.toString(); + } + + static String mkFields(StringBuffer buf, String[] fields) + { + int max = fields.length - 1; + int current = 0; + buf.append("("); + for (String s : fields) { + buf.append(s); + if ( current++ < max) buf.append(", "); + } + buf.append(")"); + return buf.toString(); + + } + + /** + * Generate a CREATE TABLE statement from the incoming fields. The preparer of the + * fields must qualify any fields in advance e.g. with types, key attributes, etc. + * + * @param tableName This is the name of the table to create. + * @param fields This is a string array of fields to generate the statement from. + * + * @return A string of valid SQL / CQL used to create the table. + */ + static String mkTableCreate(String tableName, String[] fields) + { + int max = fields.length - 1; + int current = 0; + StringBuffer buf = new StringBuffer("CREATE TABLE IF NOT EXISTS "); + buf.append(tableName); + buf.append(" ("); + for (String s : fields) { + buf.append(s); + if ( current++ < max) buf.append(", "); + } + buf.append(")"); + return buf.toString(); + } + + static String mkInsert(String tableName, Map<? extends IDbProperty, Object> props) + { + int max = props.size() - 1; + int current = 0; + StringBuffer buf = new StringBuffer("INSERT INTO "); + buf.append(tableName); + buf.append("("); + + StringBuffer vals = new StringBuffer(") VALUES ("); + + for ( IDbProperty ok : props.keySet() ) { + + String k = ok.columnName(); + buf.append(k); + vals.append(rep(ok.type(), props.get(ok))); + + if ( current++ < max ) { + buf.append(","); + vals.append(","); + } + } + buf.append(vals.toString()); + buf.append(")"); + + return buf.toString(); + } + + static String mkInsert(String tableName, Object key, Object keyval, Map<? extends IDbProperty, Object> props) + { + int max = props.size() + 1; + int current = 0; + StringBuffer buf = new StringBuffer("INSERT INTO "); + buf.append(tableName); + buf.append("("); + + StringBuffer vals = new StringBuffer(") VALUES ("); + + buf.append(key.toString()); + buf.append(","); + vals.append(keyval.toString()); + vals.append(","); + + for ( IDbProperty ok : props.keySet() ) { + + String k = ok.columnName(); + buf.append(k); + vals.append(rep(ok.type(), props.get(ok))); + + if ( current++ < max ) { + buf.append(","); + vals.append(","); + } + } + buf.append(vals.toString()); + buf.append(")"); + + return buf.toString(); + + } + + static String mkUpdate(String table, String key, Object... props) + { + int len = props.length; + StringBuffer buf = new StringBuffer("UPDATE "); + buf.append(table); + buf.append(" SET "); + + for ( int i = 0; i < len; i+=2) { + IDbProperty prop = (IDbProperty) props[i]; + if ( !prop.isPrimaryKey()) { // not allowed to update this + // we allow it in 'props' so callers can + // simply call update and expect the right + // thing to happen + + buf.append(prop.columnName()); + buf.append("="); + buf.append(rep(prop.type(), props[i+1])); + if ( i + 2 < len ) { + buf.append(","); + } + } + } + buf.append(" WHERE "); + buf.append(key); + return buf.toString(); + } + + /** + * Return the correct representation for CQL update, of val, for the indicated type, for this database. + */ + static String rep(Type t, Object val) + { + switch ( t ) { + case String: + return "'" + val.toString() + "'"; + default: + return val.toString(); + } + + } + + /** + * Convert our generic "type" to the right name for this db implementation. + * We could make Type a magic enum but I want to hide DB specifics, in particular, + * how this database names various java types. + */ + static String typeToString(Type t) + { + switch ( t ) { + case Blob: + return "blob"; + case String: + return "text"; + case Boolean: + return "boolean"; + case Integer: + return "int"; + case Long: + return "bigint"; + case Double: + return "double"; + case UUID: + return "uuid"; + } + throw new IllegalArgumentException("Unrecognized type for schema: " + t); + } + +} Added: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbVerify.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbVerify.java?rev=1711088&view=auto ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbVerify.java (added) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbVerify.java Wed Oct 28 18:12:53 2015 @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.uima.ducc.database; + +import java.io.ByteArrayInputStream; +import java.io.ObjectInputStream; +import java.nio.ByteBuffer; +import java.util.Map; + +import org.apache.uima.ducc.common.persistence.services.StateServicesDirectory; +import org.apache.uima.ducc.common.persistence.services.StateServicesSet; +import org.apache.uima.ducc.common.utils.DuccLogger; +import org.apache.uima.ducc.common.utils.id.DuccId; + +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.SimpleStatement; + +/** + * Toy orientdb loader to load a historydb from ducc history + */ + +public class DbVerify +{ + DuccLogger logger = DuccLogger.getLogger(DbLoader.class, "DBVERIFY"); + String DUCC_HOME; + + DbManager dbManager = null; + long total_bytes = 0; + + public DbVerify() + throws Exception + { + //String methodName = "<ctr>"; + DUCC_HOME = System.getProperty("DUCC_HOME"); + if ( DUCC_HOME == null ) { + System.out.println("System proprety -DDUCC_HOME must be set."); + System.exit(1); + } + } + + + void verify(String table) + throws Exception + { + String methodName = "verify"; + DbHandle h = dbManager.open(); + SimpleStatement s = new SimpleStatement("SELECT * from " + table); + //SimpleStatement s = new SimpleStatement("SELECT * from " + table + " LIMIT 10"); // for test and debug + logger.info(methodName, null, "Fetch size", s.getFetchSize()); + s.setFetchSize(100); + long now = System.currentTimeMillis(); + + int counter = 0; + int nbytes = 0; + try { + ResultSet rs = h.execute(s); + for ( Row r : rs ) { + counter++; + ByteBuffer b = r.getBytes("work"); + byte[] bytes = b.array(); + nbytes += bytes.length; + total_bytes += bytes.length;; + + ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + ObjectInputStream ois = new ObjectInputStream(bais); + Object o = ois.readObject(); + ois.close(); + DuccId did = new DuccId(r.getLong("ducc_dbid")); + + logger.info(methodName, did, "found object class", o.getClass().getName(), "of type", r.getString("type"), "in table", table, "of size", bytes.length); + } + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + logger.info(methodName, null, "Found", counter, "results. Total bytes", nbytes); + logger.info(methodName, null, "Total time for", table, System.currentTimeMillis() - now); + } + + void verifyServices() + throws Exception + { + String methodName = "verify"; + int live = 0; + int archived = 0; + StateServicesDb sdb = new StateServicesDb(); + sdb.init(logger,dbManager); + + StateServicesDirectory ssd = sdb.fetchServices(true); // first the archived stuff + Map<Long, StateServicesSet> svcmap = ssd.getMap(); + for ( Long id : svcmap.keySet() ) { + DuccId did = new DuccId(id); + archived++; + logger.info(methodName, did, "Found an archived service."); + } + + ssd = sdb.fetchServices(false); // first the archived stuff + svcmap = ssd.getMap(); + for ( Long id : svcmap.keySet() ) { + DuccId did = new DuccId(id); + logger.info(methodName, did, "Found a live service."); + live++; + } + logger.info(methodName, null, "Found", live, "live services and", archived, "archived services."); + + } + + void run() + throws Exception + { + String methodName = "run"; + long now = System.currentTimeMillis(); + String state_url = "bluej538"; + try { + dbManager = new DbManager(state_url, logger); + dbManager.init(); + + verifyServices(); + + if ( false ) verify("ducc.res_history"); + if ( false ) verify("ducc.svc_history"); + if ( false ) verify("ducc.job_history"); + + } finally { + dbManager.shutdown(); + } + logger.info(methodName, null, "Read", total_bytes, "bytes in", System.currentTimeMillis() - now, "MS"); + } + + + public static void main(String[] args) + { + DbVerify v = null; + try { + v = new DbVerify(); + v.run(); + } catch ( Exception e ) { + e.printStackTrace(); + } + } +} Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/HistoryManagerDb.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/HistoryManagerDb.java?rev=1711088&r1=1711087&r2=1711088&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/HistoryManagerDb.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/HistoryManagerDb.java Wed Oct 28 18:12:53 2015 @@ -18,52 +18,33 @@ */ package org.apache.uima.ducc.database; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.util.Set; -import org.apache.uima.ducc.common.SizeBytes; -import org.apache.uima.ducc.common.main.DuccService; +import org.apache.uima.ducc.common.Pair; import org.apache.uima.ducc.common.utils.DuccLogger; import org.apache.uima.ducc.common.utils.id.DuccId; -import org.apache.uima.ducc.database.DbConstants.DbCategory; -import org.apache.uima.ducc.database.DbConstants.DbEdge; -import org.apache.uima.ducc.database.DbConstants.DbVertex; -import org.apache.uima.ducc.transport.event.common.ADuccWork; -import org.apache.uima.ducc.transport.event.common.DuccProcess; -import org.apache.uima.ducc.transport.event.common.DuccProcessMap; -import org.apache.uima.ducc.transport.event.common.DuccReservation; -import org.apache.uima.ducc.transport.event.common.DuccWorkJob; import org.apache.uima.ducc.transport.event.common.DuccWorkMap; -import org.apache.uima.ducc.transport.event.common.DuccWorkPopDriver; import org.apache.uima.ducc.transport.event.common.DuccWorkReservation; -import org.apache.uima.ducc.transport.event.common.IDuccCompletionType.JobCompletionType; -import org.apache.uima.ducc.transport.event.common.IDuccCompletionType.ReservationCompletionType; -import org.apache.uima.ducc.transport.event.common.IDuccPerWorkItemStatistics; -import org.apache.uima.ducc.transport.event.common.IDuccProcess; -import org.apache.uima.ducc.transport.event.common.IDuccProcessMap; -import org.apache.uima.ducc.transport.event.common.IDuccReservation; -import org.apache.uima.ducc.transport.event.common.IDuccReservationMap; -import org.apache.uima.ducc.transport.event.common.IDuccState.JobState; -import org.apache.uima.ducc.transport.event.common.IDuccState.ReservationState; +import org.apache.uima.ducc.transport.event.common.IDuccTypes.DuccType; import org.apache.uima.ducc.transport.event.common.IDuccWork; import org.apache.uima.ducc.transport.event.common.IDuccWorkJob; import org.apache.uima.ducc.transport.event.common.IDuccWorkReservation; import org.apache.uima.ducc.transport.event.common.IDuccWorkService; -import org.apache.uima.ducc.transport.event.common.JdReservationBean; import org.apache.uima.ducc.transport.event.common.history.IHistoryPersistenceManager; -import com.google.gson.Gson; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.SimpleStatement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; -import com.orientechnologies.orient.core.intent.OIntentMassiveInsert; -import com.orientechnologies.orient.core.record.impl.ODocument; -import com.tinkerpop.blueprints.Direction; -import com.tinkerpop.blueprints.Edge; -import com.tinkerpop.blueprints.Vertex; -import com.tinkerpop.blueprints.impls.orient.OrientEdge; -import com.tinkerpop.blueprints.impls.orient.OrientVertex; public class HistoryManagerDb @@ -72,265 +53,225 @@ public class HistoryManagerDb private DuccLogger logger = null; - private String dburl; private DbManager dbManager; + + PreparedStatement jobPrepare = null; + PreparedStatement reservationPrepare = null; + PreparedStatement servicePrepare = null; + PreparedStatement ckptPrepare = null; + static final String JOB_TABLE = "ducc." + OrWorkProps.JOB_TABLE.pname(); + static final String RES_TABLE = "ducc." + OrWorkProps.RESERVATION_TABLE.pname(); + static final String SVC_TABLE = "ducc." + OrWorkProps.SERVICE_TABLE.pname(); + static final String CKPT_TABLE = "ducc." + OrCkptProps.CKPT_TABLE.pname(); public HistoryManagerDb() { - this(DuccService.getDuccLogger(HistoryManagerDb.class.getName())); } - - public HistoryManagerDb(DuccLogger logger) - { - - this.logger = logger; - dburl = System.getProperty("ducc.state.database.url"); + + + private boolean init(String dburl, DbManager dbm) + throws Exception + { + String methodName = "init"; + boolean ret = true; + logger.info(methodName, null, "Initializing OR persistence over the database"); try { - dbManager = new DbManager(dburl, logger); - dbManager.init(); - // TODO TODO - dbManager.declareIntent(new OIntentMassiveInsert()); - logger.warn("<CTR>.HistoryManagerDb", null, "****MUST FIX DECLARE INTENT****"); + if ( dbm != null ) { + this.dbManager = dbm; + } else { + dbManager = new DbManager(dburl, logger); + dbManager.init(); + } + + // prepare some statements + DbHandle h = dbManager.open(); + jobPrepare = h.prepare("INSERT INTO " + JOB_TABLE + " (ducc_dbid, type, history, work) VALUES (?, ?, ?, ?) IF NOT EXISTS;"); + reservationPrepare = h.prepare("INSERT INTO " + RES_TABLE + " (ducc_dbid, type, history, work) VALUES (?, ?, ?, ?) IF NOT EXISTS;"); + servicePrepare = h.prepare("INSERT INTO " + SVC_TABLE + " (ducc_dbid, type, history, work) VALUES (?, ?, ?, ?) IF NOT EXISTS;"); + ckptPrepare = h.prepare("INSERT INTO " + CKPT_TABLE + " (id, work, p2jmap) VALUES (?, ?, ?);"); + } catch ( Exception e ) { - logger.error("HisstoryManagerDb", null, "Cannot open the history database:", e); - } + logger.error(methodName, null, "Cannot open the history database:", e); + throw e; + } + return ret; } - public void setLogger(DuccLogger logger) + public boolean init(DuccLogger logger) + throws Exception { this.logger = logger; + String historyUrl = System.getProperty("ducc.state.database.url"); + return init(historyUrl, null); + } + + // package only, for the loader + boolean init(DuccLogger logger, DbManager dbManager) + throws Exception + { + this.logger = logger; + String stateUrl = System.getProperty("ducc.state.database.url"); + return init(stateUrl, dbManager); } - // ---------------------------------------------------------------------------------------------------- - // Jobs section /** - * Common code to save a job in an open handle. Caller will commit or fail as needed. + * Schema gen. Do anything you want to make the schema, but notice that DbUtil has a few convenience methods if + * you want to define your schema in a magic enum. */ - void saveJobNoCommit(DbHandle h, IDuccWorkJob j, DbVertex type, DbCategory dbcat) - throws Exception + static ArrayList<SimpleStatement> mkSchema(String tablename) + throws Exception { - String methodName = "saveJobNoCommit"; - Long nowP = System.currentTimeMillis(); - // Nuke the command lines - DuccWorkPopDriver driver = j.getDriver(); - //ICommandLine driverCl = null; - IDuccProcessMap jdProcessMap = null; - - int size = 0; - - if ( driver != null ) { - //driverCl = driver.getCommandLine(); - //driver.setCommandLine(null); - jdProcessMap = driver.getProcessMap(); - driver.setProcessMap(null); - } + ArrayList<SimpleStatement> ret = new ArrayList<SimpleStatement>(); - //ICommandLine jobCl = j.getCommandLine(); - //j.setCommandLine(null); + StringBuffer buf = new StringBuffer("CREATE TABLE IF NOT EXISTS " + tablename + " ("); + buf.append(DbUtil.mkSchema(OrWorkProps.values())); + buf.append(")"); + buf.append("WITH CLUSTERING ORDER BY (ducc_dbid desc)"); + + ret.add(new SimpleStatement(buf.toString())); + ret.add(new SimpleStatement("CREATE INDEX IF NOT EXISTS ON " + tablename + "(ducc_dbid)")); + ret.add(new SimpleStatement("CREATE INDEX IF NOT EXISTS ON " + tablename + "(history)")); - IDuccPerWorkItemStatistics stats = j.getSchedulingInfo().getPerWorkItemStatistics(); + return ret; + } - if ( stats != null ) { - if (Double.isNaN(stats.getStandardDeviation()) ) { - stats.setStandardDeviation(0.0); - } - } + static ArrayList<SimpleStatement> mkSchema() + throws Exception + { + ArrayList<SimpleStatement> ret = new ArrayList<SimpleStatement>(); - // Pull process map so we can put processes in their own records - IDuccProcessMap processMap = j.getProcessMap(); - j.setProcessMap(null); - - Gson g = DbHandle.mkGsonForJob(); - - String dbJob = g.toJson(j); - size += dbJob.length(); - - // Must repair these things because OR continues to use the job after it has been - // written to history. - j.setProcessMap(processMap); - //j.setCommandLine(jobCl); - if ( driver != null ) { - //driver.setCommandLine(driverCl); - driver.setProcessMap(jdProcessMap); - } - - OrientVertex savedJob = h.saveObject(type, j.getDuccId().getFriendly(), dbJob, dbcat); - - List<OrientVertex> savedJPs = new ArrayList<OrientVertex>(); - List<OrientVertex> savedJDs = new ArrayList<OrientVertex>(); - for (DuccId did : processMap.keySet()) { - Long pid = did.getFriendly(); - - IDuccProcess p = processMap.get(did); - String proc = g.toJson(p); - size += proc.length(); - - savedJPs.add(h.saveObject(DbVertex.Process, pid, proc, dbcat)); - // logger.info(methodName, p.getDuccId(), "2 ----------> Time to save process", System.currentTimeMillis() - nowP); - - } - - if ( driver != null ) { - for (DuccId did : jdProcessMap.keySet()) { - Long pid = did.getFriendly(); - - IDuccProcess p = jdProcessMap.get(did); - String proc = g.toJson(p); - size += proc.length(); - - savedJDs.add(h.saveObject(DbVertex.Process, pid, proc, dbcat)); - // logger.info(methodName, p.getDuccId(), "2 ----------> Time to save process", System.currentTimeMillis() - nowP); - - } - h.addEdges(savedJob, savedJDs, DbEdge.JdProcess); - } - - h.addEdges(savedJob, savedJPs, DbEdge.JpProcess); + ret.addAll(mkSchema(JOB_TABLE)); + ret.addAll(mkSchema(RES_TABLE)); + ret.addAll(mkSchema(SVC_TABLE)); + + StringBuffer buf = new StringBuffer("CREATE TABLE IF NOT EXISTS " + CKPT_TABLE + " ("); + buf.append(DbUtil.mkSchema(OrCkptProps.values())); + buf.append(")"); + ret.add(new SimpleStatement(buf.toString())); - logger.info(methodName, j.getDuccId(), "----------> Time to save job", System.currentTimeMillis() - nowP, "json size", size, "nprocesses", processMap.size()); - + return ret; } - - private void saveJobInternal(IDuccWorkJob j, DbVertex type, boolean safe, DbCategory dbcat) - throws Exception - { - - // It seems that services instances are represented as jobs without drivers. So we use - // the common code here, passing in the vertex type, for both jobs and services. - String methodName = "saveJob"; - logger.info(methodName, j.getDuccId(), "Saving: type:", type.pname(), "safe:", safe, "DbCategory:", dbcat); + // ---------------------------------------------------------------------------------------------------- + // Jobs section - Long id = j.getDuccId().getFriendly(); - DbHandle h = null; - try { + void saveWork(PreparedStatement s, IDuccWork w, boolean isHistory) + throws Exception + { + String methodName = "saveWork"; + Long nowP = System.currentTimeMillis(); + String type = null; + if ( w instanceof IDuccWorkJob ) { + type = "job"; + } else if ( w instanceof IDuccWorkReservation ) { + type = "reservation"; + } else if ( w instanceof IDuccWorkService ) { + type = "service"; + } else { + throw new IllegalArgumentException("Improper object passed to saveWork"); + } + + logger.info(methodName, w.getDuccId(), "-------- saving " + type); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream out = new ObjectOutputStream(baos); + out.writeObject(w); + out.close(); + byte[] bytes = baos.toByteArray(); + ByteBuffer buf = ByteBuffer.wrap(bytes); - if ( safe ) { - h = dbManager.open(); - } else { - h = dbManager.openNoLog(); - } + DbHandle h = dbManager.open(); + h.saveObject(jobPrepare, w.getDuccId().getFriendly(), type, isHistory, buf); - if ( safe && h.thingInDatabase(id, type, dbcat) ) { - logger.warn(methodName, j.getDuccId(), "Not overwriting saved job."); - h.close(); - return; - } - } catch ( Exception e ) { - if ( h != null ) h.close(); - throw e; - } - - try { - saveJobNoCommit(h, j, type, dbcat); - } catch ( Exception e ) { - h.rollback(); - logger.error(methodName, j.getDuccId(), "Cannot store job", e); - throw e; - } finally { - Long nowP = System.currentTimeMillis(); - h.commit(); - logger.info(methodName, j.getDuccId(), "Time to commit", System.currentTimeMillis() - nowP); - h.close(); - } - } + logger.info(methodName, w.getDuccId(), "----------> Time to save", type, ":", System.currentTimeMillis() - nowP, "Size:", bytes.length, "bytes."); + } /** - * For use by the loader, load it without the existence check; the assumption this is a first-time load - * and the check isn't needed. This saves history only. + * Part of history management, recover ths indicated job from history. */ - public void saveJobUnsafe(IDuccWorkJob j) + <T> T restoreWork(Class<T> cl, String tablename, long friendly_id) throws Exception { - saveJobInternal(j, DbVertex.Job, false, DbCategory.History); - } + String methodName = "restoreWork"; + T ret = null; + DbHandle h = null; + h = dbManager.open(); + String cql = "SELECT WORK FROM " + tablename + " WHERE DUCC_DBID=" + Long.toString(friendly_id); + ResultSet rs = h.execute(cql); + for ( Row r : rs ) { + logger.info(methodName, null, "----- Restoring", friendly_id); + ByteBuffer bbWork = r.getBytes("work"); + + byte[] workbytes = bbWork.array(); + ByteArrayInputStream bais = new ByteArrayInputStream(workbytes); + ObjectInputStream ois = new ObjectInputStream(bais); + ret= (T) ois.readObject(); + ois.close(); + } + + return ret; + } + /** - * For use by normal operation: forces an existence check. This saves history only. + * Part of history management, recover ths indicated jobs from history. + * + * Reminder to self, we need to pass Clas<T> cl so compiler can infer T. */ - public void saveJob(IDuccWorkJob j) - throws Exception - { - saveJobInternal(j, DbVertex.Job, true, DbCategory.History); - } - - - private IDuccWorkJob restoreJobInternal(DbHandle h, OrientVertex v) + public <T> ArrayList<T> restoreSeveralThings(Class<T> cl, String tablename, long max) throws Exception { - IDuccWorkJob j = null; - - ODocument d = v.getRecord(); - String json = d.toJSON(); - JsonObject jo = mkJsonObject(json); + String methodName = "restoreSeveralThings"; - Gson g = DbHandle.mkGsonForJob(); - j = g.fromJson(jo, DuccWorkJob.class); - - // System.out.println(g.toJson(jo)); - - IDuccProcessMap pm = j.getProcessMap(); // seems to get set by default when job is recovered - Iterable<Edge> ed = v.getEdges(Direction.OUT, DbEdge.JpProcess.pname()); - for ( Edge e : ed ) { - OrientEdge oe = (OrientEdge) e; - OrientVertex ov = oe.getVertex(Direction.IN); - - ODocument pd = ov.getRecord(); - String pjson = pd.toJSON(); - - IDuccProcess pe = g.fromJson(pjson, DuccProcess.class); - pm.addProcess(pe); - } + ArrayList<T> ret = new ArrayList<T>(); + DbHandle h = dbManager.open(); + SimpleStatement s = new SimpleStatement("SELECT * from " + tablename + " limit " + max); + s.setFetchSize(100); + long now = System.currentTimeMillis(); - DuccWorkPopDriver driver = j.getDriver(); - if ( driver != null ) { - pm = new DuccProcessMap(); - driver.setProcessMap(pm); // seems NOT to get set when driver is reconstituted - ed = v.getEdges(Direction.OUT, DbEdge.JdProcess.pname()); - for ( Edge e : ed ) { - OrientEdge oe = (OrientEdge) e; - OrientVertex ov = oe.getVertex(Direction.IN); - - ODocument pd = ov.getRecord(); - String pjson = pd.toJSON(); - - IDuccProcess pe = g.fromJson(pjson, DuccProcess.class); - pm.addProcess(pe); + try { + int count = 0; + int nbytes = 0; + ResultSet rs = h.execute(s); + for ( Row r : rs ) { + count++; + ByteBuffer b = r.getBytes("work"); + byte[] workbytes = b.array(); + nbytes += workbytes.length; + + ByteArrayInputStream bais = new ByteArrayInputStream(workbytes); + ObjectInputStream ois = new ObjectInputStream(bais); + ret.add( (T) ois.readObject()); + ois.close(); + count++; } - } + + logger.info(methodName, null, "Found", count, "results. Total bytes", nbytes, "Time:", System.currentTimeMillis() - now); + } catch (Exception e) { + logger.error(methodName, null, "Error fetching history:", e); + } + return ret; + } - // Now must hack around becase this 'n that and JSON can't work out some things - String ct = (String) ((ADuccWork)j).getCompletionTypeObject(); - j.setCompletionType(JobCompletionType.valueOf(ct)); - String so = (String) ((ADuccWork)j).getStateObject(); - j.setJobState(JobState.valueOf(so)); - - return j; + /** + * For use by normal operation: forces an existence check. This saves history only. + */ + public void saveJob(IDuccWorkJob j) + throws Exception + { + saveWork(jobPrepare, j, true); } + /** * Part of history management, recover ths indicated job from history. */ public IDuccWorkJob restoreJob(long friendly_id) throws Exception { - DuccWorkJob ret = null; - DbHandle h = null; - try { - h = dbManager.open(); - Iterable<Vertex> q = h.select("SELECT * FROM " + DbVertex.Job.pname() + " WHERE ducc_dbid=" + friendly_id + - " AND " + DbConstants.DUCC_DBCAT + "='" + DbCategory.History.pname() + "'"); - for ( Vertex v : q ) { - // There's only 1 unless db is broken. - return restoreJobInternal(h, (OrientVertex) v); - } - } finally { - h.close(); - } - - return ret; + return (IDuccWorkJob) restoreWork(IDuccWorkJob.class, JOB_TABLE, friendly_id); } /** @@ -339,22 +280,7 @@ public class HistoryManagerDb public ArrayList<IDuccWorkJob> restoreJobs(long max) throws Exception { - ArrayList<IDuccWorkJob> ret = new ArrayList<IDuccWorkJob>(); - DbHandle h = null; - try { - h = dbManager.open(); - Iterable<Vertex> q = h.select("SELECT * FROM " + DbVertex.Job.pname() + - " where " + DbConstants.DUCC_DBCAT +"='" + DbCategory.History.pname() + - "' ORDER BY ducc_dbid DESC LIMIT "+ max); - for ( Vertex v : q ) { - IDuccWorkJob j = restoreJobInternal(h, (OrientVertex) v); - ret.add(j); - } - } finally { - h.close(); - } - - return ret; + return restoreSeveralThings(IDuccWorkJob.class, JOB_TABLE, max); } // End of jobs section // ---------------------------------------------------------------------------------------------------- @@ -363,179 +289,21 @@ public class HistoryManagerDb // ---------------------------------------------------------------------------------------------------- // Reservations section - private void saveReservationNoCommit(DbHandle h, IDuccWorkReservation r, DbCategory dbcat) - throws Exception - { - String methodName = "saveReservationNoCommit"; - long now = System.currentTimeMillis(); - - List<JdReservationBean> l = r.getJdReservationBeanList(); - if ( l != null ) { - for (JdReservationBean b : l ) { - ConcurrentHashMap<DuccId, SizeBytes> map = b.getMap(); - for ( DuccId k : map.keySet() ) { - logger.info(methodName, null, "SAVE ===> " + k.getFriendly() + " " + k.getUnique() + " : " + map.get(k)); - } - } - } - - - - - Long id = r.getDuccId().getFriendly(); - logger.info(methodName, r.getDuccId(), "Saving."); - - // Nuke the command lines - - IDuccReservationMap resmap = r.getReservationMap(); - r.setReservationMap(null); - - Gson g = DbHandle.mkGsonForJob(); - - String dbres = g.toJson(r); - // logger.info(methodName, null, "------------------- Reservation JSON: " + dbres); - - // Must repair these things because OR continues to use the job after it has been - // written to history. - r.setReservationMap(resmap); - - OrientVertex savedRes = h.saveObject(DbVertex.Reservation, id, dbres, dbcat); - - List<OrientVertex> savedHosts = new ArrayList<OrientVertex>(); - for (DuccId did : resmap.keySet()) { - Long pid = did.getFriendly(); - - IDuccReservation p = resmap.get(did); - String proc = g.toJson(p); - - savedHosts.add(h.saveObject(DbVertex.Process, pid, proc, dbcat)); - // logger.info(methodName, p.getDuccId(), "2 ----------> Time to save process", System.currentTimeMillis() - nowP); - - } - - h.addEdges(savedRes, savedHosts, DbEdge.JpProcess); - logger.info(methodName, r.getDuccId(), "----------> Total reservation save time:", System.currentTimeMillis() - now, "nPE", resmap.size()); - - } - - private void saveReservationInternal(IDuccWorkReservation r, boolean safe, DbCategory dbcat) - throws Exception - { - String methodName = "saveReservation"; - - Long id = r.getDuccId().getFriendly(); - DbHandle h = null; - try { - if ( safe ) { - h = dbManager.open(); - } else { - h = dbManager.openNoTx(); - } - if ( safe && h.thingInDatabase(id, DbVertex.Reservation, dbcat) ) { - h.close(); - return; - } - } catch ( Exception e ) { - logger.warn(methodName, r.getDuccId(), e); - h.close(); - return; - } - - try { - saveReservationNoCommit(h, r, dbcat); - } catch ( Exception e ) { - h.rollback(); - logger.error(methodName, r.getDuccId(), "Cannot store reservation:", e); - } finally { - h.commit(); - h.close(); - } - - } - // Save to history only public void saveReservation(IDuccWorkReservation r) throws Exception { - saveReservationInternal(r, true, DbCategory.History); - } - - public void saveReservationUnsafe(IDuccWorkReservation r) - throws Exception - { - saveReservationInternal(r, false, DbCategory.History); + saveWork(reservationPrepare, r, true); } - private IDuccWorkReservation restoreReservationInternal(DbHandle h, OrientVertex v) - throws Exception - { - // String methodName = "restoreReservationInternal"; - IDuccWorkReservation r = null; - - ODocument d = v.getRecord(); - String json = d.toJSON(); - JsonObject jo = mkJsonObject(json); - - Gson g = DbHandle.mkGsonForJob(); - // logger.info(methodName, null, g.toJson(jo)); - - r = g.fromJson(jo, DuccWorkReservation.class); - - //List<JdReservationBean> l = r.getJdReservationBeanList(); - //if ( l != null ) { - //for (JdReservationBean b : l ) { - //ConcurrentHashMap<DuccId, SizeBytes> map = b.getMap(); - //for ( DuccId k : map.keySet() ) { - // logger.info(methodName, null, "REST ===> " + k.getFriendly() + " " + k.getUnique() + " : " + map.get(k)); - //} - //} - //} - - IDuccReservationMap rm = r.getReservationMap(); // seems to get set by default when job is recovered - Iterable<Edge> ed = v.getEdges(Direction.OUT, DbEdge.JpProcess.pname()); - for ( Edge e : ed ) { - OrientEdge oe = (OrientEdge) e; - OrientVertex ov = oe.getVertex(Direction.IN); - - ODocument pd = ov.getRecord(); - String pjson = pd.toJSON(); - - IDuccReservation rr = g.fromJson(pjson, DuccReservation.class); - rm.addReservation(rr); - } - - // Now must hack around becase this 'n that and JSON can't work out some things - String ct = (String) ((ADuccWork)r).getCompletionTypeObject(); - r.setCompletionType(ReservationCompletionType.valueOf(ct)); - - String so = (String) ((ADuccWork)r).getStateObject(); - r.setReservationState(ReservationState.valueOf(so)); - - return r; - } - /** * Part of history management, recover ths indicated reservation from history. */ public IDuccWorkReservation restoreReservation(long duccid) throws Exception { - DuccWorkReservation ret = null; - DbHandle h = null; - try { - h = dbManager.open(); - Iterable<Vertex> q = h.select("SELECT * FROM " + DbVertex.Reservation.pname() + " WHERE ducc_dbid=" + duccid + - " AND " + DbConstants.DUCC_DBCAT +"='" + DbCategory.History.pname() + "'"); - for ( Vertex v : q ) { - // There's only 1 unless db is broken. - return restoreReservationInternal(h, (OrientVertex) v); - } - } finally { - h.close(); - } - - return ret; - } + return (IDuccWorkReservation) restoreWork(IDuccWorkReservation.class, RES_TABLE, duccid); + } /** * Part of history management, recover ths indicated reservations from history. @@ -543,44 +311,20 @@ public class HistoryManagerDb public ArrayList<IDuccWorkReservation> restoreReservations(long max) throws Exception { - ArrayList<IDuccWorkReservation> ret = new ArrayList<IDuccWorkReservation>(); - DbHandle h = null; - try { - h = dbManager.open(); - Iterable<Vertex> q = h.select("SELECT * FROM " + DbVertex.Reservation.pname() + - " WHERE " + DbConstants.DUCC_DBCAT + "='" + DbCategory.History.pname() + "'" + - " ORDER BY ducc_dbid DESC LIMIT "+ max); - for ( Vertex v : q ) { - IDuccWorkReservation j = restoreReservationInternal(h, (OrientVertex) v); - ret.add(j); - } - } finally { - h.close(); - } + return restoreSeveralThings(IDuccWorkReservation.class, RES_TABLE, max); + } - return ret; - } // End of reservations section // ---------------------------------------------------------------------------------------------------- // ---------------------------------------------------------------------------------------------------- // Services section - // public void serviceSave(IDuccWorkService s) - // throws Exception - // { - // } public void saveService(IDuccWorkService s) throws Exception { - saveJobInternal((IDuccWorkJob)s, DbVertex.ServiceInstance, true, DbCategory.History); - } - - public void saveServiceUnsafe(IDuccWorkService s) - throws Exception - { - saveJobInternal((IDuccWorkJob)s, DbVertex.ServiceInstance, false, DbCategory.History); + saveWork(servicePrepare, s, true); } @@ -590,19 +334,7 @@ public class HistoryManagerDb public IDuccWorkService restoreService(long duccid) throws Exception { - DbHandle h = null; - try { - h = dbManager.open(); - Iterable<Vertex> q = h.select("SELECT * FROM " + DbVertex.ServiceInstance.pname() + " WHERE ducc_dbid=" + duccid + - " AND " + DbConstants.DUCC_DBCAT + "='" + DbCategory.History.pname() + "'"); - for ( Vertex v : q ) { - return restoreJobInternal(h, (OrientVertex) v); - } - } finally { - h.close(); - } - - return null; + return (IDuccWorkService) restoreWork(IDuccWorkService.class, SVC_TABLE, duccid); } /** @@ -611,23 +343,7 @@ public class HistoryManagerDb public ArrayList<IDuccWorkService> restoreServices(long max) throws Exception { - ArrayList<IDuccWorkService> ret = new ArrayList<IDuccWorkService>(); - DbHandle h = null; - try { - h = dbManager.open(); - Iterable<Vertex> q = h.select("SELECT * FROM " + DbVertex.ServiceInstance.pname() + - " WHERE " + DbConstants.DUCC_DBCAT +"='" + DbCategory.History.pname() + "'" + - " ORDER BY ducc_dbid DESC LIMIT "+ max); - for ( Vertex v : q ) { - IDuccWorkService j = restoreJobInternal(h, (OrientVertex) v); - ret.add(j); - } - } finally { - h.close(); - } - - return ret; - + return restoreSeveralThings(IDuccWorkService.class, SVC_TABLE, max); } // End of services section // ---------------------------------------------------------------------------------------------------- @@ -652,127 +368,92 @@ public class HistoryManagerDb long now = System.currentTimeMillis(); boolean ret = true; - DbHandle h = null; - try { - h = dbManager.open(); - } catch ( Exception e ) { - logger.warn(methodName, null, "Cannot open database.", e); - if ( h != null ) h.close(); - return false; - } - // We transactionally delete the old checkpoint, and then save the new one. If something gows wrong we // rollback and thus don't lose stuff. In theory. - try { - h.execute("DELETE VERTEX V where " + DbConstants.DUCC_DBCAT + "='" + DbCategory.Checkpoint.pname() + "'"); - Map<DuccId, IDuccWork> map = work.getMap(); - for (IDuccWork w : map.values() ) { - switch(w.getDuccType()) { - case Job: - saveJobNoCommit(h, (IDuccWorkJob) w, DbVertex.Job, DbCategory.Checkpoint); - break; - case Service: - saveJobNoCommit(h, (IDuccWorkJob) w, DbVertex.ServiceInstance, DbCategory.Checkpoint); - break; - case Reservation: - if ( w.getDuccId().getFriendly() == 282282 ) { - int x = 0; - x++; - } - saveReservationNoCommit(h, (IDuccWorkReservation) w, DbCategory.Checkpoint); - break; - default: - break; - } - } - - Gson g = DbHandle.mkGsonForJob(); - ProcessToJobList l = new ProcessToJobList(processToJob); - String json = g.toJson(l, l.getClass()); - // logger.info(methodName, null, "ProcessToJob:", json); - h.saveObject(DbVertex.ProcessToJob, null, json, DbCategory.Checkpoint); - h.commit(); + + // TODO: make the truncate and insert transactional + DbHandle h = dbManager.open(); + h.truncate("ducc.orckpt"); + + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream out = new ObjectOutputStream(baos); + out.writeObject(work); + out.close(); + byte[] bytes = baos.toByteArray(); + ByteBuffer workbuf = ByteBuffer.wrap(bytes); + + baos = new ByteArrayOutputStream(); + out= new ObjectOutputStream(baos); + out.writeObject(processToJob); + out.close(); + bytes = baos.toByteArray(); + ByteBuffer mapbuf = ByteBuffer.wrap(bytes); + + h = dbManager.open(); + h.saveObject(ckptPrepare, 0, workbuf, mapbuf); + } catch ( Exception e ) { - if ( h != null ) h.rollback(); logger.error(methodName, null, "Cannot save ProcessToJob map", e); ret = false; } finally { - if ( h != null ) h.close(); if ( ret ) logger.info(methodName, null, "Saved Orchestrator Checkpoint"); } - logger.info(methodName, null, "Total time to save checkpoint:", System.currentTimeMillis() - now); - return ret; + logger.info(methodName, null, "Total time to save checkpoint:", System.currentTimeMillis() - now); + return ret; } /** * Orchestrator checkpoint. Restore the checkpoint from the DB. Caller must initialize * empty maps, which we fill in. */ - public boolean restore(DuccWorkMap work, Map<DuccId, DuccId> processToJob) + public Pair<DuccWorkMap, Map<DuccId, DuccId>> restore() throws Exception { - String methodName = "restore"; + String methodName = "restore"; DbHandle h = null; - boolean ret = true; + Pair<DuccWorkMap, Map<DuccId, DuccId>> ret = new Pair<DuccWorkMap, Map<DuccId, DuccId>>(); try { h = dbManager.open(); - // Select all the "top level" objects ith DUCC_LIVE=true. When they get restored the - // attached object will get collected. - - Iterable<Vertex> q = h.select("SELECT * FROM V WHERE (" + - "@CLASS ='" + DbVertex.Job.pname() + - "' OR " + - "@CLASS ='" + DbVertex.Reservation.pname() + - "' OR " + - "@CLASS ='" + DbVertex.ServiceInstance.pname() + - "') AND " + DbConstants.DUCC_DBCAT + "='" + DbCategory.Checkpoint.pname() + "'"); - - IDuccWork w = null; - for ( Vertex v : q ) { - String l = ((OrientVertex) v).getLabel(); - if ( l.equals(DbVertex.Job.pname()) || l.equals(DbVertex.ServiceInstance.pname()) ) { - w = restoreJobInternal(h, (OrientVertex) v); - } else if ( l.equals(DbVertex.Reservation.pname()) ) { - w = restoreReservationInternal(h, (OrientVertex) v); - } else { - logger.warn(methodName, null, "Unexpected record of type", l, "in checkpoint restore."); + String cql = "SELECT * FROM ducc.orckpt WHERE id=0"; + ResultSet rs = h.execute(cql); + for ( Row r : rs ) { + logger.info(methodName, null, "Found checkpoint."); + ByteBuffer bbWork = r.getBytes("work"); + ByteBuffer bbmap = r.getBytes("p2jmap"); + + byte[] workbytes = bbWork.array(); + ByteArrayInputStream bais = new ByteArrayInputStream(workbytes); + ObjectInputStream ois = new ObjectInputStream(bais); + DuccWorkMap work = (DuccWorkMap) ois.readObject(); + Map<DuccId, IDuccWork> map = work.getMap(); + ois.close(); + + workbytes = bbmap.array(); + bais = new ByteArrayInputStream(workbytes); + ois = new ObjectInputStream(bais); + Map<DuccId, DuccId> processToJob = (Map<DuccId, DuccId>) ois.readObject(); + ois.close(); + + // hack because java serializion is stupid and won't call the no-args constructor - need + // to restore sometransient fields + Set<DuccId> ids = work.getReservationKeySet(); + for ( DuccId id : ids ) { + DuccWorkReservation res = (DuccWorkReservation) work.findDuccWork(DuccType.Reservation, ""+id.getFriendly()); + if ( r != null ) res.initLogger(); } - work.addDuccWork(w); + ret = new Pair(work, processToJob); } - q = h.select("SELECT FROM " + DbVertex.ProcessToJob.pname()); - - int count = 0; - for ( Vertex vv : q ) { - if ( count > 1 ) { - logger.error(methodName, null, "Oops - we have multiple ProcessToJob records. Using the first one but it may be wrong."); - break; - } - - OrientVertex v = (OrientVertex) vv; - ODocument d = v.getRecord(); - String json = d.toJSON(); - logger.info(methodName, null, json); - - Gson g = DbHandle.mkGsonForJob(); - - ProcessToJobList l = g.fromJson(json, ProcessToJobList.class); - l.fill(processToJob); - } - - } catch ( Exception e ) { + } catch ( Exception e ) { logger.error(methodName, null, "Error restoring checkpoint:", e); - ret = false; - } finally { - if ( h != null ) h.close(); - } - - + } + return ret; } - + // End of OR checkpoint save and restore // ---------------------------------------------------------------------------------------------------- Added: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/ReadCkpt.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/ReadCkpt.java?rev=1711088&view=auto ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/ReadCkpt.java (added) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/ReadCkpt.java Wed Oct 28 18:12:53 2015 @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. +*/ +package org.apache.uima.ducc.database; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.uima.ducc.common.utils.DuccLogger; +import org.apache.uima.ducc.common.utils.id.DuccId; +import org.apache.uima.ducc.transport.event.common.DuccWorkMap; + +/** + * A test routine, read and verify the OR checkpoint file. + */ + +public class ReadCkpt +{ + + DuccLogger logger = DuccLogger.getLogger(DbLoader.class, "DBLOAD"); + String DUCC_HOME; + DbManager dbManager = null; + HistoryManagerDb hmd = null; + + public ReadCkpt() + throws Exception + { + DUCC_HOME = System.getProperty("DUCC_HOME"); + if ( DUCC_HOME == null ) { + System.out.println("System proprety -DDUCC_HOME must be set."); + System.exit(1); + } + + String state_url = "bluej538"; + System.setProperty("ducc.state.database.url", state_url); + + dbManager = new DbManager(state_url, logger); + dbManager.init(); + + hmd = new HistoryManagerDb(); + } + + public void run() + { + DuccWorkMap work = new DuccWorkMap(); + Map<DuccId, DuccId> processToJob = new HashMap<DuccId, DuccId>(); + + } + + public static void main(String [] args) + { + try { + ReadCkpt rc = new ReadCkpt(); + rc.run(); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + + } + +} Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmStatePersistence.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmStatePersistence.java?rev=1711088&r1=1711087&r2=1711088&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmStatePersistence.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmStatePersistence.java Wed Oct 28 18:12:53 2015 @@ -19,17 +19,16 @@ package org.apache.uima.ducc.database; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Properties; import org.apache.uima.ducc.common.persistence.rm.IRmPersistence; import org.apache.uima.ducc.common.utils.DuccLogger; -import org.apache.uima.ducc.database.DbConstants.DbCategory; -import org.apache.uima.ducc.database.DbConstants.DbVertex; -import com.google.gson.Gson; -import com.tinkerpop.blueprints.impls.orient.OrientVertex; +import com.datastax.driver.core.SimpleStatement; /** * Manage saving and fetching of transient RM state. The primary consumer is @@ -67,6 +66,11 @@ public class RmStatePersistence init(stateUrl); } + public void shutdown() + { + dbManager.shutdown(); + } + public void clear() throws Exception { @@ -74,41 +78,55 @@ public class RmStatePersistence DbHandle h = null; try { h = dbManager.open(); - h.execute("DELETE VERTEX V where " + DbConstants.DUCC_DBCAT + "='" + DbCategory.RmState.pname() + "'"); + h.execute("TRUNCATE ducc.rmnodes"); } catch ( Exception e ) { logger.error(methodName, null, "Cannot clear the database.", e); - } finally { - if ( h != null ) h.close(); - } + } } - public String toGson(Object o) + static List<SimpleStatement> mkSchema() + throws Exception { - // We need to define Instance creators and such so we do it in a common place - Gson g = DbHandle.mkGsonForJob(); - return g.toJson(o); + List<SimpleStatement> ret = new ArrayList<SimpleStatement>(); + + StringBuffer buf = new StringBuffer("CREATE TABLE IF NOT EXISTS ducc." + RmProperty.TABLE_NAME.pname() + " ("); + buf.append(DbUtil.mkSchema(RmProperty.values())); + buf.append(")"); + ret.add(new SimpleStatement(buf.toString())); + return ret; } - public Object createMachine(String m, Properties props) + // static String[] mkSchemaItems() + // { + // int size = RmProperty.values().length; + // String[] ret = new String[size]; + // int ndx = 0; + + // for ( RmProperty n: RmProperty.values() ) { + // String s = n.pname(); + // s = s + " " + DbUtil.typeToString(n.type()); + // if ( n.isPrimaryKey() ) { + // s = s + " PRIMARY KEY"; + // } + // ret[ndx++] = s; + // } + // return ret; + // } + + public void createMachine(String m, Map<RmProperty, Object> props) throws Exception { String methodName = "createMachine"; DbHandle h = dbManager.open(); - Object ret = null; try { - OrientVertex v = h.createProperties(DbConstants.DUCC_DBNODE, m, DbVertex.RmNode, DbCategory.RmState, props); - ret = v.getId(); - h.commit(); + String cql = DbUtil.mkInsert("ducc.rmnodes", props); + h.execute(cql); } catch ( Exception e ) { - logger.error(methodName, null, "Update", m, "ROLLBACK: ", e); - if ( h != null ) h.rollback(); - } finally { - if ( h != null ) h.close(); - } - return ret; + logger.error(methodName, null, "Error creating new record:", e); + } } - public void setProperties(Object dbid, String dbk, Object... props) + public void setProperties(String node, Object... props) throws Exception { String methodName = "setProperties"; @@ -121,37 +139,28 @@ public class RmStatePersistence DbHandle h = dbManager.open(); try { - h.updateProperties(dbid, props); - h.commit(); + h.updateProperties("ducc.rmnodes", "WHERE name='" + node + "'", props); } catch ( Exception e ) { - logger.error(methodName, null, "Update", dbk, "ROLLBACK: ", e); - if ( h != null ) h.rollback(); - } finally { - if ( h != null ) h.close(); - logger.info(methodName, null, "Total time to update properties on", dbid.toString(), System.currentTimeMillis() - now); + logger.error(methodName, null, "Problem setting properties"); + } finally { + logger.info(methodName, null, "Total time to update properties on", System.currentTimeMillis() - now); } } - public void setProperty(Object dbid, String dbk, RmPropName k, Object v) + public void setProperty(String node, RmProperty k, Object v) throws Exception { String methodName = "setProperty"; - long now = System.currentTimeMillis(); DbHandle h = dbManager.open(); try { - h.updateProperty(dbid, k.pname(), v); - h.commit(); + h.updateProperty("ducc.rmnodes", "name='" + node + "'", k.columnName(), v); } catch ( Exception e ) { - logger.error(methodName, null, "Update", dbk, "ROLLBACK: ", e); - if ( h != null ) h.rollback(); - } finally { - if ( h != null ) h.close(); - logger.info(methodName, null, "Total time to update property on", dbid.toString(), System.currentTimeMillis() - now); - } + logger.error(methodName, null, "Problem setting properties."); + } } @@ -172,28 +181,3 @@ public class RmStatePersistence } } - -/** - String name; - String nodepoolId; - long memory; - int order; - boolean blacklisted; // UIMA-4142 - boolean online; // UIMA-4234 - boolean responsive; // UIMA-4234 - - - - - Properties file for a node - name = string - ip = string - state = <state> - states: vary status: online | offline - reporting : present | absent - nodepool = string - quantum = string - class = string - scheduling policy = string - scheduled work = list of duccids of work on the node - */