Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbManager.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbManager.java?rev=1711088&r1=1711087&r2=1711088&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbManager.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbManager.java
 Wed Oct 28 18:12:53 2015
@@ -20,180 +20,134 @@
 package org.apache.uima.ducc.database;
 
 import java.io.FileInputStream;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.Properties;
 
 import org.apache.uima.ducc.common.utils.DuccLogger;
-import org.apache.uima.ducc.database.DbConstants.DbEdge;
-import org.apache.uima.ducc.database.DbConstants.DbVertex;
-
-import com.orientechnologies.orient.client.remote.OServerAdmin;
-import com.orientechnologies.orient.core.intent.OIntent;
-import com.tinkerpop.blueprints.impls.orient.OrientGraph;
-import com.tinkerpop.blueprints.impls.orient.OrientGraphFactory;
-import com.tinkerpop.blueprints.impls.orient.OrientGraphNoTx;
 
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+
+/**
+ * Provide a common point for contacting the db, acquiring sessions/handles to 
it, managing the db,
+ * closing, etc.
+ */
 public class DbManager
 {
 
     String dburl;
     DuccLogger logger;
 
-    OrientGraphFactory  factory;
-
-    Map<String, DbVertex> name_to_vertex = new HashMap<String, DbVertex>();
-    Map<String, DbEdge>   name_to_edge   = new HashMap<String, DbEdge>();
-
-    // private ODatabaseDocumentTx documentDb;
+    private Cluster cluster;            // only one
+    private Session session;            // only one - it's thread safe and 
manages a connection pool
 
     public DbManager(String dburl, DuccLogger logger)
         throws Exception
     {
         this.dburl = dburl;
         this.logger = logger;
-
-        for ( DbVertex o :  DbVertex.values() ) {
-            name_to_vertex.put(o.pname(), o);
-        }
-        for ( DbEdge o :  DbEdge.values() ) {
-            name_to_edge.put(o.pname(), o);
-        }
-
     }
-
-    DbVertex vertexType(String v)
-    {
-        return name_to_vertex.get(v);
-    }
-
-    DbEdge EdgeType(String e)
-    {
-        return name_to_edge.get(e);
-    }
-
     
     boolean checkForDatabase()
        throws Exception
     {
         String methodName = "checkForDatabase";        
-        String pw = dbPassword();
-        OServerAdmin admin = null;
-
-        boolean ret = true;
-        try {
-            admin = new OServerAdmin(dburl);
-            admin.connect("root", pw);               // connect to the server
-                       
-            if ( ! admin.existsDatabase("plocal") ) {
-                logger.info(methodName, null, "Database " + dburl + " does not 
exist.");
-                ret = false;
-            }
-        } finally {
-            if ( admin != null) admin.close();
-        }
-        return ret;
+        logger.warn(methodName, null, "Not yet implemented.");
+        return true;
     }
 
     public void drop()
         throws Exception
     {
-        if ( dburl.startsWith("remote") ) {
-            OServerAdmin admin = null;
-            try {
-                String pw = dbPassword();
-               admin = new OServerAdmin(dburl);
-                admin.connect("root", pw);               // connect to the 
server
-                admin.dropDatabase("plocal");           
-                admin.close();
-            } finally {
-                if ( admin != null ) admin.close();
-            }
-        } else  {
-            OrientGraphNoTx graphDb = factory.getNoTx();        // the graph 
instance
-            if ( graphDb == null ) {
-                throw new IllegalStateException("Cannot allocate graph 
instance for " + dburl);
-            }
-            graphDb.drop();
-        }
+        String methodName = "drop";
+        logger.warn(methodName, null, "Drop is not implemented yet.");
     }
 
     public synchronized DbHandle open()
         throws Exception
     {
-       OrientGraph graphDb = factory.getTx();        // the graph instance
-        if ( graphDb == null ) {
-            throw new IllegalStateException("Cannot allocate graph instance 
for " + dburl);
+        if ( session == null ) {
+            session = cluster.connect();
         }
-        
-        graphDb.setUseLightweightEdges(true);
-        return new DbHandle(this, graphDb);
+
+        return new DbHandle(this);
     }
 
-    public synchronized DbHandle openNoLog()
-        throws Exception
+       public synchronized void init()
+       throws Exception
     {
-       OrientGraph graphDb = factory.getTx();        // the graph instance
-        if ( graphDb == null ) {
-            throw new IllegalStateException("Cannot allocate graph instance 
for " + dburl);
-        }
+        String methodName = "init";
+
+        if ( cluster != null ) return;        // already initialized
+
+        cluster = Cluster.builder()
+            .addContactPoint(dburl)
+            .build();
+
+        Metadata metadata = cluster.getMetadata();
+        logger.info(methodName, null, "Connected to cluster: %s\n", 
metadata.getClusterName());
         
-        graphDb.setUseLightweightEdges(true);
-        graphDb.getRawGraph().getTransaction().setUsingLog(false);
-        return new DbHandle(this, graphDb);
+        for ( Host host : metadata.getAllHosts() ) {
+            logger.info(methodName, null, "Datatacenter: %s; Host: %s; Rack: 
%s\n", host.getDatacenter(), host.getAddress(), host.getRack());
+        } 
     }
 
-
-    public synchronized DbHandle openNoTx()
-        throws Exception
+    public synchronized void shutdown()
     {
-       OrientGraphNoTx graphDb = factory.getNoTx();        // the graph 
instance
-        if ( graphDb == null ) {
-            throw new IllegalStateException("Cannot allocate graph instance 
for " + dburl);
-        }
-        graphDb.setUseLightweightEdges(true);        
-        return new DbHandle(this, graphDb);
+       String methodName = "closeDatabase";
+        logger.info(methodName, null, "Closing the database.");
+        cluster.close();        
+        cluster = null;
+        session = null;
     }
 
-
-       public synchronized void init()
-       throws Exception
+    PreparedStatement prepare(String cql)
     {
-        String methodName = "init";
+        return session.prepare(cql);
+    }
 
-        if ( factory != null ) return;        // already initialized
+    void truncate(String table)
+        throws Exception
+    {
+        execute("TRUNCATE " + table);
+    }
 
-        if ( ! dburl.startsWith("plocal") ) {
-            // make sure the server is up if it's not a plocal db
-            if ( !checkForDatabase() ) {
-                throw new IllegalStateException("Database does not exist and 
must be created:" + dburl);
-            }
+    String truncateText(String s)
+    {
+        String ret = s;
+        if ( ret.length() > 200 ) {
+            ret = s.substring(0, 200) + " ... ";
         }
-
-        factory = new OrientGraphFactory(dburl);
-        if ( factory == null ) {
-            throw new IllegalStateException("Cannot create graph factory for " 
+ dburl);
-        } 
-        logger.info(methodName, null, "Database is opened:", dburl);
-        factory.setupPool(1,20);        
+        return ret;
     }
 
-    public synchronized void declareIntent(OIntent intent)
+    ResultSet execute(String cql)
     {
-        factory.declareIntent(intent);
+       String methodName = "execute";
+        if ( logger.isDebug() ) {
+            logger.info(methodName, null, "EXECUTE CQL:", cql);
+        } else {
+            logger.info(methodName, null, "EXECUTE CQL:", truncateText(cql));
+        }
+        return session.execute(cql);
     }
 
-    public synchronized void shutdown()
+    ResultSet execute(BoundStatement s)
     {
-       String methodName = "closeDatabase";
-        logger.info(methodName, null, "Closing the database.");
-        if ( factory != null ) {
-            // closes all pooled instances and stops the factory
-            factory.close();
-            factory = null;
-        }        
+        return session.execute(s);
     }
 
+    ResultSet execute(SimpleStatement s)
+    {
+       String methodName = "execute";
+        logger.info(methodName, null, "EXECUTE STATEMENT:", 
truncateText(s.getQueryString()));
+        return session.execute(s);
+    }
 
     static String dbPassword()
        throws Exception

Added: 
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbUtil.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbUtil.java?rev=1711088&view=auto
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbUtil.java
 (added)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbUtil.java
 Wed Oct 28 18:12:53 2015
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+
+package org.apache.uima.ducc.database;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.uima.ducc.common.persistence.IDbProperty;
+import org.apache.uima.ducc.common.persistence.IDbProperty.Type;
+
+/**
+ * Static common helper methods.
+ *
+ * Not public at this point, would prefer to encapsulate all this entirely in 
DB.
+ */
+class DbUtil
+{
+    static String mkSchema(IDbProperty[] props)
+        throws Exception
+    {
+        List<String> parts = new ArrayList<String>();
+        List<String> primaries = new ArrayList<String>();
+        for (IDbProperty n : props ) {
+            if ( n.isMeta() ) continue;
+            String s = n.columnName() + " " + typeToString(n.type());
+            if ( n.isPrimaryKey() ) {
+                primaries.add(n.columnName());
+            }
+            parts.add(s);
+        }
+        if ( primaries.size() == 0 ) {
+            throw new IllegalArgumentException("Schema properties must declare 
at least one primary key.");
+        }
+        StringBuffer buf = new StringBuffer();
+        for ( String p : parts ) {
+            buf.append(p);
+            buf.append(",");
+        }
+        int ncommas = primaries.size() - 1;
+        int c = 0;
+        buf.append(" PRIMARY KEY(");
+        for ( String s : primaries ) {
+            buf.append(s);
+            if ( c++ < ncommas ) {
+                buf.append(",");
+            }
+        }
+
+        buf.append(")");
+        return buf.toString();
+    }
+
+    static String mkFields(StringBuffer buf, String[] fields)
+    {
+        int max = fields.length - 1;
+        int current = 0;
+        buf.append("(");
+        for (String s : fields) {
+            buf.append(s);
+            if ( current++ < max) buf.append(", ");
+        }
+        buf.append(")");
+        return buf.toString();                   
+
+    }
+
+    /**
+     * Generate a CREATE TABLE statement from the incoming fields.  The 
preparer of the
+     * fields must qualify any fields in advance e.g. with types, key 
attributes, etc.
+     *
+     * @param tableName This is the name of the table to create.
+     * @param fields This is a string array of fields to generate the 
statement from.
+     * 
+     * @return A string of valid SQL / CQL used to create the table.
+     */
+    static String mkTableCreate(String tableName, String[] fields)
+    {
+        int max = fields.length - 1;
+        int current = 0;
+        StringBuffer buf = new StringBuffer("CREATE TABLE IF NOT EXISTS ");
+        buf.append(tableName);
+        buf.append(" (");
+        for (String s : fields) {
+            buf.append(s);
+            if ( current++ < max) buf.append(", ");
+        }
+        buf.append(")");
+        return buf.toString();                   
+    }
+
+    static String mkInsert(String tableName, Map<? extends IDbProperty, 
Object> props)
+    {
+        int max = props.size() - 1;
+        int current = 0;
+        StringBuffer buf = new StringBuffer("INSERT INTO ");
+        buf.append(tableName);
+        buf.append("(");
+
+        StringBuffer vals = new StringBuffer(") VALUES (");
+
+        for ( IDbProperty ok : props.keySet() ) {
+
+            String k = ok.columnName();
+            buf.append(k);
+            vals.append(rep(ok.type(), props.get(ok)));
+
+            if ( current++ < max ) {
+                buf.append(",");
+                vals.append(",");
+            }
+        }
+        buf.append(vals.toString());
+        buf.append(")");
+
+        return buf.toString();
+    }
+
+    static String mkInsert(String tableName, Object key, Object keyval, Map<? 
extends IDbProperty, Object> props)
+    {
+        int max = props.size() + 1;
+        int current = 0;
+        StringBuffer buf = new StringBuffer("INSERT INTO ");
+        buf.append(tableName);
+        buf.append("(");
+
+        StringBuffer vals = new StringBuffer(") VALUES (");
+
+        buf.append(key.toString());
+        buf.append(",");
+        vals.append(keyval.toString());
+        vals.append(",");
+
+        for ( IDbProperty ok : props.keySet() ) {
+
+            String k = ok.columnName();
+            buf.append(k);
+            vals.append(rep(ok.type(), props.get(ok)));
+
+            if ( current++ < max ) {
+                buf.append(",");
+                vals.append(",");
+            }
+        }
+        buf.append(vals.toString());
+        buf.append(")");
+
+        return buf.toString();
+
+    }
+
+    static String mkUpdate(String table, String key, Object... props)
+    {
+        int len = props.length;
+        StringBuffer buf = new StringBuffer("UPDATE ");
+        buf.append(table);        
+        buf.append(" SET ");
+        
+        for ( int i = 0; i < len; i+=2) {
+            IDbProperty prop = (IDbProperty) props[i];
+            if ( !prop.isPrimaryKey()) {                  // not allowed to 
update this
+                                                          // we allow it in 
'props' so callers can
+                                                          // simply call 
update and expect the right
+                                                          // thing to happen
+
+                buf.append(prop.columnName());
+                buf.append("=");
+                buf.append(rep(prop.type(), props[i+1]));
+                if ( i + 2 < len ) {
+                    buf.append(",");
+                }  
+            }
+        }
+        buf.append(" WHERE ");
+        buf.append(key);
+        return buf.toString();
+    }
+
+    /**
+     * Return the correct representation for CQL update, of val, for the 
indicated type, for this database.
+     */
+    static String rep(Type t, Object val)
+    {
+        switch ( t ) {
+        case String:
+            return "'" + val.toString() + "'";
+        default:
+            return val.toString();
+        }
+
+    }
+
+    /**
+     * Convert our generic "type" to the right name for this db implementation.
+     * We could make Type a magic enum but I want to hide DB specifics, in 
particular,
+     * how this database names various java types.
+     */
+    static String typeToString(Type t)
+    {
+        switch ( t ) {
+            case Blob:
+                return  "blob";
+            case String:
+                return  "text";
+            case Boolean:
+                return "boolean";
+            case Integer:
+                return "int";
+            case Long:
+                return "bigint";
+            case Double:
+                return "double";
+            case UUID:
+                return "uuid";
+        }
+        throw new IllegalArgumentException("Unrecognized type for schema: " + 
t);
+    }
+
+}

Added: 
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbVerify.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbVerify.java?rev=1711088&view=auto
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbVerify.java
 (added)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbVerify.java
 Wed Oct 28 18:12:53 2015
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.uima.ducc.database;
+
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import org.apache.uima.ducc.common.persistence.services.StateServicesDirectory;
+import org.apache.uima.ducc.common.persistence.services.StateServicesSet;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.SimpleStatement;
+
+/**
+ * Toy orientdb loader to load a historydb from ducc history
+ */
+
+public class DbVerify
+{
+    DuccLogger logger = DuccLogger.getLogger(DbLoader.class, "DBVERIFY");
+    String DUCC_HOME;
+
+    DbManager dbManager = null;
+    long total_bytes = 0;
+
+    public DbVerify()
+        throws Exception
+    {
+       //String methodName = "<ctr>";
+        DUCC_HOME = System.getProperty("DUCC_HOME");        
+        if ( DUCC_HOME == null ) {
+            System.out.println("System proprety -DDUCC_HOME must be set.");
+            System.exit(1);
+        }        
+    }
+
+
+    void verify(String table)
+       throws Exception
+    {
+       String methodName = "verify";
+        DbHandle h = dbManager.open();
+        SimpleStatement s = new SimpleStatement("SELECT * from " + table);
+        //SimpleStatement s = new SimpleStatement("SELECT * from " + table + " 
LIMIT 10"); // for test and debug
+        logger.info(methodName, null, "Fetch size", s.getFetchSize());
+        s.setFetchSize(100);
+        long now = System.currentTimeMillis();
+
+        int counter = 0;
+        int nbytes = 0;
+        try {
+            ResultSet rs = h.execute(s);
+            for ( Row r : rs ) {
+                counter++;
+                ByteBuffer b = r.getBytes("work");
+                byte[] bytes = b.array();
+                nbytes += bytes.length;
+                total_bytes += bytes.length;;
+
+                ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+                ObjectInputStream ois = new ObjectInputStream(bais);
+                Object o = ois.readObject();
+                ois.close();            
+                DuccId did = new DuccId(r.getLong("ducc_dbid"));
+                
+                logger.info(methodName, did, "found object class", 
o.getClass().getName(), "of type", r.getString("type"), "in table", table, "of 
size", bytes.length);
+            }
+        } catch (Exception e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+        
+        logger.info(methodName, null, "Found", counter, "results. Total 
bytes", nbytes);
+        logger.info(methodName, null, "Total time for", table, 
System.currentTimeMillis() - now);
+    }
+    
+    void verifyServices()
+       throws Exception
+    {
+       String methodName = "verify";
+        int live = 0;
+        int archived = 0;
+        StateServicesDb sdb = new StateServicesDb();
+        sdb.init(logger,dbManager);
+
+        StateServicesDirectory ssd = sdb.fetchServices(true);          // 
first the archived stuff
+        Map<Long, StateServicesSet>  svcmap = ssd.getMap();
+        for ( Long id : svcmap.keySet() ) {
+            DuccId did = new DuccId(id);
+            archived++;
+            logger.info(methodName, did, "Found an archived service.");
+        }
+
+        ssd = sdb.fetchServices(false);          // first the archived stuff
+        svcmap = ssd.getMap();
+        for ( Long id : svcmap.keySet() ) {
+            DuccId did = new DuccId(id);
+            logger.info(methodName, did, "Found a live service.");
+            live++;
+        }
+        logger.info(methodName, null, "Found", live, "live services and", 
archived, "archived services.");
+
+    }
+
+    void run()
+        throws Exception
+    {
+        String methodName = "run";
+        long now = System.currentTimeMillis();
+        String state_url = "bluej538";
+        try {
+            dbManager = new DbManager(state_url, logger);
+            dbManager.init();
+
+            verifyServices();
+
+            if ( false ) verify("ducc.res_history");
+            if ( false ) verify("ducc.svc_history");                
+            if ( false ) verify("ducc.job_history");
+            
+        } finally {
+            dbManager.shutdown();
+        }
+        logger.info(methodName, null, "Read", total_bytes, "bytes in",  
System.currentTimeMillis() - now, "MS");
+    }
+
+    
+    public static void main(String[] args)
+    {
+        DbVerify v = null;
+        try {
+            v = new DbVerify();
+            v.run();
+        } catch ( Exception e  ) {
+            e.printStackTrace();
+        } 
+    }
+}

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/HistoryManagerDb.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/HistoryManagerDb.java?rev=1711088&r1=1711087&r2=1711088&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/HistoryManagerDb.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/HistoryManagerDb.java
 Wed Oct 28 18:12:53 2015
@@ -18,52 +18,33 @@
 */
 package org.apache.uima.ducc.database;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.Set;
 
-import org.apache.uima.ducc.common.SizeBytes;
-import org.apache.uima.ducc.common.main.DuccService;
+import org.apache.uima.ducc.common.Pair;
 import org.apache.uima.ducc.common.utils.DuccLogger;
 import org.apache.uima.ducc.common.utils.id.DuccId;
-import org.apache.uima.ducc.database.DbConstants.DbCategory;
-import org.apache.uima.ducc.database.DbConstants.DbEdge;
-import org.apache.uima.ducc.database.DbConstants.DbVertex;
-import org.apache.uima.ducc.transport.event.common.ADuccWork;
-import org.apache.uima.ducc.transport.event.common.DuccProcess;
-import org.apache.uima.ducc.transport.event.common.DuccProcessMap;
-import org.apache.uima.ducc.transport.event.common.DuccReservation;
-import org.apache.uima.ducc.transport.event.common.DuccWorkJob;
 import org.apache.uima.ducc.transport.event.common.DuccWorkMap;
-import org.apache.uima.ducc.transport.event.common.DuccWorkPopDriver;
 import org.apache.uima.ducc.transport.event.common.DuccWorkReservation;
-import 
org.apache.uima.ducc.transport.event.common.IDuccCompletionType.JobCompletionType;
-import 
org.apache.uima.ducc.transport.event.common.IDuccCompletionType.ReservationCompletionType;
-import org.apache.uima.ducc.transport.event.common.IDuccPerWorkItemStatistics;
-import org.apache.uima.ducc.transport.event.common.IDuccProcess;
-import org.apache.uima.ducc.transport.event.common.IDuccProcessMap;
-import org.apache.uima.ducc.transport.event.common.IDuccReservation;
-import org.apache.uima.ducc.transport.event.common.IDuccReservationMap;
-import org.apache.uima.ducc.transport.event.common.IDuccState.JobState;
-import org.apache.uima.ducc.transport.event.common.IDuccState.ReservationState;
+import org.apache.uima.ducc.transport.event.common.IDuccTypes.DuccType;
 import org.apache.uima.ducc.transport.event.common.IDuccWork;
 import org.apache.uima.ducc.transport.event.common.IDuccWorkJob;
 import org.apache.uima.ducc.transport.event.common.IDuccWorkReservation;
 import org.apache.uima.ducc.transport.event.common.IDuccWorkService;
-import org.apache.uima.ducc.transport.event.common.JdReservationBean;
 import 
org.apache.uima.ducc.transport.event.common.history.IHistoryPersistenceManager;
 
-import com.google.gson.Gson;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.SimpleStatement;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
-import com.orientechnologies.orient.core.intent.OIntentMassiveInsert;
-import com.orientechnologies.orient.core.record.impl.ODocument;
-import com.tinkerpop.blueprints.Direction;
-import com.tinkerpop.blueprints.Edge;
-import com.tinkerpop.blueprints.Vertex;
-import com.tinkerpop.blueprints.impls.orient.OrientEdge;
-import com.tinkerpop.blueprints.impls.orient.OrientVertex;
 
 
 public class HistoryManagerDb 
@@ -72,265 +53,225 @@ public class HistoryManagerDb
 
        
        private DuccLogger logger = null;
-    private String dburl;
     private DbManager dbManager;
+
+    PreparedStatement jobPrepare = null;
+    PreparedStatement reservationPrepare = null;
+    PreparedStatement servicePrepare = null;
+    PreparedStatement ckptPrepare = null;
+    static final String JOB_TABLE = "ducc." + OrWorkProps.JOB_TABLE.pname();
+    static final String RES_TABLE = "ducc." + 
OrWorkProps.RESERVATION_TABLE.pname();
+    static final String SVC_TABLE = "ducc." + 
OrWorkProps.SERVICE_TABLE.pname();
+    static final String CKPT_TABLE = "ducc." + OrCkptProps.CKPT_TABLE.pname();
                
     public HistoryManagerDb()
     {
-        this(DuccService.getDuccLogger(HistoryManagerDb.class.getName()));
     }
-
-       public HistoryManagerDb(DuccLogger logger) 
-    {
-        
-        this.logger = logger;
-        dburl = System.getProperty("ducc.state.database.url");
+    
+    
+       private boolean init(String dburl, DbManager dbm)
+        throws Exception
+    {        
+               String methodName = "init";
+        boolean ret = true;
+        logger.info(methodName, null, "Initializing OR persistence over the 
database");
         try {
-            dbManager = new DbManager(dburl, logger);
-            dbManager.init();
-            // TODO TODO
-            dbManager.declareIntent(new OIntentMassiveInsert());
-            logger.warn("<CTR>.HistoryManagerDb", null, "****MUST FIX DECLARE 
INTENT****");
+            if ( dbm != null ) {
+                this.dbManager = dbm;
+            } else {
+                dbManager = new DbManager(dburl, logger);
+                dbManager.init();
+            }
+
+            // prepare some statements
+            DbHandle h = dbManager.open();
+            jobPrepare         = h.prepare("INSERT INTO " + JOB_TABLE + " 
(ducc_dbid, type, history, work) VALUES (?, ?, ?, ?) IF NOT EXISTS;");          
  
+            reservationPrepare = h.prepare("INSERT INTO " + RES_TABLE + " 
(ducc_dbid, type, history, work) VALUES (?, ?, ?, ?) IF NOT EXISTS;");          
  
+            servicePrepare     = h.prepare("INSERT INTO " + SVC_TABLE + " 
(ducc_dbid, type, history, work) VALUES (?, ?, ?, ?) IF NOT EXISTS;");          
  
+            ckptPrepare = h.prepare("INSERT INTO " + CKPT_TABLE + " (id, work, 
p2jmap) VALUES (?, ?, ?);");            
+
         } catch ( Exception e ) {
-            logger.error("HisstoryManagerDb", null, "Cannot open the history 
database:", e);
-        }        
+            logger.error(methodName, null, "Cannot open the history 
database:", e);
+            throw e;
+        }
+        return ret;
        }
 
-    public void setLogger(DuccLogger logger)
+    public boolean init(DuccLogger logger)
+        throws Exception
     {
         this.logger = logger;
+        String historyUrl = System.getProperty("ducc.state.database.url");
+        return init(historyUrl, null);
+    }
+
+    // package only, for the loader
+    boolean init(DuccLogger logger, DbManager dbManager)
+       throws Exception
+    {
+       this.logger = logger;
+        String stateUrl = System.getProperty("ducc.state.database.url");
+        return init(stateUrl, dbManager);
     }
 
-    // 
----------------------------------------------------------------------------------------------------
-    // Jobs section
 
     /**
-     * Common code to save a job in an open handle.  Caller will commit or 
fail as needed.
+     * Schema gen.  Do anything you want to make the schema, but notice that 
DbUtil has a few convenience methods if
+     * you want to define your schema in a magic enum.
      */
-    void saveJobNoCommit(DbHandle h, IDuccWorkJob j, DbVertex type, DbCategory 
dbcat)
-        throws Exception
+    static ArrayList<SimpleStatement> mkSchema(String tablename)
+       throws Exception
     {
-       String methodName = "saveJobNoCommit";
-        Long nowP =  System.currentTimeMillis();
-        // Nuke the command lines
-        DuccWorkPopDriver driver = j.getDriver();
-        //ICommandLine driverCl = null;
-        IDuccProcessMap jdProcessMap = null;
-
-        int size = 0;
-
-        if ( driver != null ) {
-            //driverCl = driver.getCommandLine();
-            //driver.setCommandLine(null);
-            jdProcessMap =  driver.getProcessMap();
-            driver.setProcessMap(null);
-        }
+        ArrayList<SimpleStatement> ret = new ArrayList<SimpleStatement>();
 
-        //ICommandLine jobCl    = j.getCommandLine();
-        //j.setCommandLine(null);
+        StringBuffer buf = new StringBuffer("CREATE TABLE IF NOT EXISTS " + 
tablename + " (");
+        buf.append(DbUtil.mkSchema(OrWorkProps.values()));
+        buf.append(")");
+        buf.append("WITH CLUSTERING ORDER BY (ducc_dbid desc)");
+
+        ret.add(new SimpleStatement(buf.toString()));
+        ret.add(new SimpleStatement("CREATE INDEX IF NOT EXISTS ON " +  
tablename + "(ducc_dbid)"));
+        ret.add(new SimpleStatement("CREATE INDEX IF NOT EXISTS ON " +  
tablename + "(history)"));
 
-        IDuccPerWorkItemStatistics stats = 
j.getSchedulingInfo().getPerWorkItemStatistics();
+        return ret;
+    }
 
-        if ( stats != null ) {
-            if (Double.isNaN(stats.getStandardDeviation()) ) {
-                stats.setStandardDeviation(0.0);
-            }
-        }
+    static ArrayList<SimpleStatement> mkSchema()
+       throws Exception
+    {
+        ArrayList<SimpleStatement> ret = new ArrayList<SimpleStatement>();
 
-        // Pull process map so we can put processes in their own records
-        IDuccProcessMap processMap = j.getProcessMap();
-        j.setProcessMap(null);
-
-        Gson g = DbHandle.mkGsonForJob();
-
-        String dbJob = g.toJson(j);
-        size += dbJob.length();
-
-        // Must repair these things because OR continues to use the job after 
it has been
-        // written to history.
-        j.setProcessMap(processMap);
-        //j.setCommandLine(jobCl);
-        if ( driver != null ) {
-            //driver.setCommandLine(driverCl);
-            driver.setProcessMap(jdProcessMap);
-        }
-        
-        OrientVertex savedJob = h.saveObject(type, 
j.getDuccId().getFriendly(), dbJob, dbcat);
-    
-        List<OrientVertex> savedJPs = new ArrayList<OrientVertex>();
-        List<OrientVertex> savedJDs = new ArrayList<OrientVertex>();
-        for (DuccId did : processMap.keySet()) {
-            Long pid = did.getFriendly();
-            
-            IDuccProcess p = processMap.get(did);
-            String proc = g.toJson(p);
-            size += proc.length();
-            
-            savedJPs.add(h.saveObject(DbVertex.Process, pid, proc, dbcat));
-            // logger.info(methodName, p.getDuccId(), "2 ----------> Time to 
save process", System.currentTimeMillis() - nowP);
-                         
-        }
-        
-        if ( driver != null ) {
-            for (DuccId did : jdProcessMap.keySet()) {
-                Long pid = did.getFriendly();
-                
-                IDuccProcess p = jdProcessMap.get(did);
-                String proc = g.toJson(p);
-                size += proc.length();
-                
-                savedJDs.add(h.saveObject(DbVertex.Process, pid, proc, dbcat));
-                // logger.info(methodName, p.getDuccId(), "2 ----------> Time 
to save process", System.currentTimeMillis() - nowP);
-                
-            }
-            h.addEdges(savedJob, savedJDs, DbEdge.JdProcess);
-        }
-        
-        h.addEdges(savedJob, savedJPs, DbEdge.JpProcess);
+        ret.addAll(mkSchema(JOB_TABLE));
+        ret.addAll(mkSchema(RES_TABLE));
+        ret.addAll(mkSchema(SVC_TABLE));
+
+        StringBuffer buf = new StringBuffer("CREATE TABLE IF NOT EXISTS " + 
CKPT_TABLE + " (");
+        buf.append(DbUtil.mkSchema(OrCkptProps.values()));
+        buf.append(")");
+        ret.add(new SimpleStatement(buf.toString()));
 
-        logger.info(methodName, j.getDuccId(), "----------> Time to save job", 
System.currentTimeMillis() - nowP, "json size", size, "nprocesses", 
processMap.size());
-        
+        return ret;
     }
-    
-    private void saveJobInternal(IDuccWorkJob j, DbVertex type, boolean safe, 
DbCategory dbcat)
-        throws Exception 
-    {
-
-        // It seems that services instances are represented as jobs without 
drivers.  So we use
-        // the common code here, passing in the vertex type, for both jobs and 
services.
 
-        String methodName = "saveJob";
-        logger.info(methodName, j.getDuccId(), "Saving: type:", type.pname(), 
"safe:", safe, "DbCategory:", dbcat);
+    // 
----------------------------------------------------------------------------------------------------
+    // Jobs section
 
-               Long id = j.getDuccId().getFriendly();
-        DbHandle h = null;
-        try {
+    void saveWork(PreparedStatement s, IDuccWork w, boolean isHistory)
+        throws Exception
+    {
+       String methodName = "saveWork";
+        Long nowP =  System.currentTimeMillis();
+        String type = null;
+        if ( w instanceof IDuccWorkJob ) {
+            type = "job";
+        } else if ( w instanceof IDuccWorkReservation ) {
+            type = "reservation";
+        } else if ( w instanceof IDuccWorkService ) {
+            type = "service";
+        } else {
+               throw new IllegalArgumentException("Improper object passed to 
saveWork");
+        }
+
+        logger.info(methodName, w.getDuccId(), "-------- saving " + type);
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ObjectOutputStream out = new ObjectOutputStream(baos);
+        out.writeObject(w);
+        out.close();
+        byte[] bytes = baos.toByteArray();
+        ByteBuffer buf = ByteBuffer.wrap(bytes);
 
-            if ( safe ) {
-                h = dbManager.open(); 
-            } else {
-                h = dbManager.openNoLog();
-            }
+        DbHandle h = dbManager.open();
+        h.saveObject(jobPrepare,  w.getDuccId().getFriendly(), type, 
isHistory, buf);
 
-            if ( safe && h.thingInDatabase(id, type, dbcat) ) {
-                logger.warn(methodName, j.getDuccId(), "Not overwriting saved 
job.");
-                h.close();
-                return;
-            } 
-        } catch ( Exception e ) {
-            if ( h != null ) h.close();
-            throw e;
-        }
-
-        try {
-            saveJobNoCommit(h, j, type, dbcat);
-        } catch ( Exception e ) {
-            h.rollback();
-            logger.error(methodName, j.getDuccId(), "Cannot store job", e);
-            throw e;
-        } finally {
-            Long nowP =  System.currentTimeMillis();
-            h.commit();
-            logger.info(methodName, j.getDuccId(), "Time to commit", 
System.currentTimeMillis() - nowP);
-            h.close();
-        }
-       }
+        logger.info(methodName, w.getDuccId(), "----------> Time to save", 
type, ":", System.currentTimeMillis() - nowP, "Size:", bytes.length, "bytes."); 
       
+    }
 
     /**
-     * For use by the loader, load it without the existence check; the 
assumption this is a first-time load
-     * and the check isn't needed.  This saves history only.
+     * Part of history management, recover ths indicated job from history.
      */
-    public void saveJobUnsafe(IDuccWorkJob j)
+    <T> T restoreWork(Class<T> cl, String tablename, long friendly_id)
         throws Exception
     {
-        saveJobInternal(j, DbVertex.Job, false, DbCategory.History);
-    }
+       String methodName = "restoreWork";
+        T ret = null;
+        DbHandle h = null;
 
+        h = dbManager.open();
+        String cql = "SELECT WORK FROM " + tablename + " WHERE DUCC_DBID=" + 
Long.toString(friendly_id);
+        ResultSet rs = h.execute(cql);
+        for ( Row r : rs ) {
+            logger.info(methodName, null, "----- Restoring", friendly_id); 
+            ByteBuffer bbWork = r.getBytes("work");
+            
+            byte[] workbytes = bbWork.array();
+            ByteArrayInputStream bais = new ByteArrayInputStream(workbytes);
+            ObjectInputStream ois = new ObjectInputStream(bais);
+            ret= (T) ois.readObject();
+            ois.close();            
+        } 
+        
+        return ret;
+    }
+    
     /**
-     * For use by normal operation: forces an existence check.  This saves 
history only.
+     * Part of history management, recover ths indicated jobs from history.
+     *
+     * Reminder to self, we need to pass Clas<T> cl so compiler can infer T.
      */
-       public void saveJob(IDuccWorkJob j)
-        throws Exception 
-    {
-        saveJobInternal(j, DbVertex.Job, true, DbCategory.History);
-    }
-
-       
-    private IDuccWorkJob restoreJobInternal(DbHandle h, OrientVertex v)
+    public <T> ArrayList<T> restoreSeveralThings(Class<T> cl, String 
tablename, long max)
         throws Exception
     {
-        IDuccWorkJob j = null;
-
-        ODocument d = v.getRecord();
-        String json = d.toJSON();
-        JsonObject jo = mkJsonObject(json);
+       String methodName = "restoreSeveralThings";
 
-        Gson g = DbHandle.mkGsonForJob();        
-        j      = g.fromJson(jo, DuccWorkJob.class);
-
-        // System.out.println(g.toJson(jo));
-        
-        IDuccProcessMap pm = j.getProcessMap();              // seems to get 
set by default when job is recovered
-        Iterable<Edge> ed = v.getEdges(Direction.OUT, 
DbEdge.JpProcess.pname());
-        for ( Edge e : ed ) {
-            OrientEdge   oe = (OrientEdge) e;
-            OrientVertex ov = oe.getVertex(Direction.IN);
-            
-            ODocument    pd    = ov.getRecord();
-            String       pjson = pd.toJSON();
-            
-            IDuccProcess pe = g.fromJson(pjson, DuccProcess.class);
-            pm.addProcess(pe);
-        }
+        ArrayList<T> ret = new ArrayList<T>();
+        DbHandle h = dbManager.open();
+        SimpleStatement s = new SimpleStatement("SELECT * from " + tablename + 
" limit " + max);
+        s.setFetchSize(100);
+        long now = System.currentTimeMillis();
 
-        DuccWorkPopDriver driver = j.getDriver();
-        if ( driver != null ) {
-            pm = new DuccProcessMap();
-            driver.setProcessMap(pm);                     // seems NOT to get 
set when driver is reconstituted
-            ed = v.getEdges(Direction.OUT, DbEdge.JdProcess.pname());
-            for ( Edge e : ed ) {
-                OrientEdge   oe = (OrientEdge) e;
-                OrientVertex ov = oe.getVertex(Direction.IN);
-                
-                ODocument    pd    = ov.getRecord();
-                String       pjson = pd.toJSON();
-                
-                IDuccProcess pe = g.fromJson(pjson, DuccProcess.class);
-                pm.addProcess(pe);
+        try {
+            int count = 0;
+            int nbytes = 0;
+            ResultSet rs = h.execute(s);
+            for ( Row r : rs ) {
+                count++;
+                ByteBuffer b = r.getBytes("work");
+                byte[] workbytes = b.array();
+                nbytes += workbytes.length;
+
+                ByteArrayInputStream bais = new 
ByteArrayInputStream(workbytes);
+                ObjectInputStream ois = new ObjectInputStream(bais);
+                ret.add( (T) ois.readObject());
+                ois.close();            
+                count++;
             }
-        }
+            
+            logger.info(methodName, null, "Found", count, "results. Total 
bytes", nbytes, "Time:",  System.currentTimeMillis() - now);
+               } catch (Exception e) {
+            logger.error(methodName, null, "Error fetching history:", e);
+               }
+        return ret;
+    }
 
-        // Now must hack around becase this 'n that and JSON can't work out 
some things
-        String ct = (String) ((ADuccWork)j).getCompletionTypeObject();
-        j.setCompletionType(JobCompletionType.valueOf(ct));
 
-        String so = (String) ((ADuccWork)j).getStateObject();
-        j.setJobState(JobState.valueOf(so));
-        
-        return j;
+    /**
+     * For use by normal operation: forces an existence check.  This saves 
history only.
+     */
+       public void saveJob(IDuccWorkJob j)
+        throws Exception 
+    {
+        saveWork(jobPrepare, j, true);
     }
 
+       
     /**
      * Part of history management, recover ths indicated job from history.
      */
     public IDuccWorkJob restoreJob(long friendly_id)
         throws Exception
     {
-        DuccWorkJob ret = null;
-        DbHandle h = null;
-        try {
-            h = dbManager.open();
-            Iterable<Vertex> q =  h.select("SELECT * FROM " + 
DbVertex.Job.pname() + " WHERE ducc_dbid=" + friendly_id + 
-                                           " AND " + DbConstants.DUCC_DBCAT + 
"='" + DbCategory.History.pname() + "'");
-            for ( Vertex v : q ) {
-                // There's only 1 unless db is broken.
-                return restoreJobInternal(h, (OrientVertex) v);
-            }
-        } finally {
-            h.close();
-        }
-
-        return ret;
+        return (IDuccWorkJob) restoreWork(IDuccWorkJob.class, JOB_TABLE, 
friendly_id);
     }
     
     /**
@@ -339,22 +280,7 @@ public class HistoryManagerDb
     public ArrayList<IDuccWorkJob> restoreJobs(long max)
         throws Exception
     {
-        ArrayList<IDuccWorkJob> ret = new ArrayList<IDuccWorkJob>();
-        DbHandle h = null;
-        try {
-            h = dbManager.open();
-            Iterable<Vertex> q =  h.select("SELECT * FROM " + 
DbVertex.Job.pname() + 
-                                           " where " + DbConstants.DUCC_DBCAT 
+"='" + DbCategory.History.pname() + 
-                                           "' ORDER BY ducc_dbid DESC LIMIT "+ 
max);
-            for ( Vertex v : q ) {
-                IDuccWorkJob j = restoreJobInternal(h, (OrientVertex) v);
-                ret.add(j);
-            }
-        } finally {
-            h.close();
-        }
-
-        return ret;
+        return restoreSeveralThings(IDuccWorkJob.class, JOB_TABLE, max);
     }
     // End of jobs section
     // 
----------------------------------------------------------------------------------------------------
@@ -363,179 +289,21 @@ public class HistoryManagerDb
     // 
----------------------------------------------------------------------------------------------------
     // Reservations section
 
-       private void saveReservationNoCommit(DbHandle h, IDuccWorkReservation 
r, DbCategory dbcat)
-        throws Exception 
-    {
-        String methodName = "saveReservationNoCommit";
-        long now = System.currentTimeMillis();
-
-        List<JdReservationBean> l = r.getJdReservationBeanList();
-        if ( l != null ) {
-            for (JdReservationBean b : l ) {
-                ConcurrentHashMap<DuccId, SizeBytes> map = b.getMap();
-                for ( DuccId k : map.keySet() ) {
-                    logger.info(methodName, null, "SAVE ===> " + 
k.getFriendly() + " " + k.getUnique() + " : " + map.get(k));
-                }
-            }
-        }
-
-
-
-
-               Long id = r.getDuccId().getFriendly();
-        logger.info(methodName, r.getDuccId(), "Saving.");
-   
-        // Nuke the command lines
-
-        IDuccReservationMap resmap = r.getReservationMap();
-        r.setReservationMap(null);
-
-        Gson g = DbHandle.mkGsonForJob();
-
-        String dbres = g.toJson(r);
-        // logger.info(methodName, null, "------------------- Reservation 
JSON: " + dbres);
-        
-        // Must repair these things because OR continues to use the job after 
it has been
-        // written to history.
-        r.setReservationMap(resmap);
-        
-        OrientVertex savedRes = h.saveObject(DbVertex.Reservation, id, dbres, 
dbcat);
-        
-        List<OrientVertex> savedHosts = new ArrayList<OrientVertex>();
-        for (DuccId did : resmap.keySet()) {
-            Long pid = did.getFriendly();
-            
-            IDuccReservation p = resmap.get(did);
-            String proc = g.toJson(p);
-            
-            savedHosts.add(h.saveObject(DbVertex.Process, pid, proc, dbcat));
-            // logger.info(methodName, p.getDuccId(), "2 ----------> Time to 
save process", System.currentTimeMillis() - nowP);
-            
-        }
-        
-        h.addEdges(savedRes, savedHosts, DbEdge.JpProcess);
-        logger.info(methodName, r.getDuccId(), "----------> Total reservation 
save time:", System.currentTimeMillis() - now, "nPE", resmap.size());
-
-       }
-
-    private void saveReservationInternal(IDuccWorkReservation r, boolean safe, 
DbCategory dbcat)
-        throws Exception 
-    {
-        String methodName = "saveReservation";
-
-               Long id = r.getDuccId().getFriendly();
-        DbHandle h = null;
-        try {
-            if ( safe ) {
-                h = dbManager.open(); 
-            } else {
-                h = dbManager.openNoTx();
-            }
-            if ( safe && h.thingInDatabase(id, DbVertex.Reservation, dbcat) ) {
-                h.close();
-                return;
-            } 
-        } catch ( Exception e ) {
-            logger.warn(methodName, r.getDuccId(), e);
-            h.close();
-            return;
-        }
-
-        try {
-            saveReservationNoCommit(h, r, dbcat);
-        } catch ( Exception e ) {
-            h.rollback();
-            logger.error(methodName, r.getDuccId(), "Cannot store 
reservation:", e);
-        } finally {
-            h.commit();
-            h.close();
-        }
-
-       }
-
     // Save to history only
        public void saveReservation(IDuccWorkReservation r) 
         throws Exception 
     {
-        saveReservationInternal(r, true, DbCategory.History);
-    }
-
-       public void saveReservationUnsafe(IDuccWorkReservation r) 
-        throws Exception 
-    {
-        saveReservationInternal(r, false, DbCategory.History);
+        saveWork(reservationPrepare, r, true);
     }
 
-    private IDuccWorkReservation restoreReservationInternal(DbHandle h, 
OrientVertex v)
-        throws Exception
-    {
-        // String methodName = "restoreReservationInternal";
-        IDuccWorkReservation r = null;
-
-        ODocument d = v.getRecord();
-        String json = d.toJSON();
-        JsonObject jo = mkJsonObject(json);
-
-        Gson g = DbHandle.mkGsonForJob();        
-        // logger.info(methodName, null, g.toJson(jo));
-
-        r      = g.fromJson(jo, DuccWorkReservation.class);
-
-        //List<JdReservationBean> l = r.getJdReservationBeanList();
-        //if ( l != null ) {
-            //for (JdReservationBean b : l ) {
-                //ConcurrentHashMap<DuccId, SizeBytes> map = b.getMap();
-                //for ( DuccId k : map.keySet() ) {
-                //    logger.info(methodName, null, "REST ===> " + 
k.getFriendly() + " " + k.getUnique() + " : " + map.get(k));
-                //}
-            //}
-        //}
-        
-        IDuccReservationMap rm = r.getReservationMap();              // seems 
to get set by default when job is recovered
-        Iterable<Edge> ed = v.getEdges(Direction.OUT, 
DbEdge.JpProcess.pname());
-        for ( Edge e : ed ) {
-            OrientEdge   oe = (OrientEdge) e;
-            OrientVertex ov = oe.getVertex(Direction.IN);
-            
-            ODocument    pd    = ov.getRecord();
-            String       pjson = pd.toJSON();
-            
-            IDuccReservation rr = g.fromJson(pjson, DuccReservation.class);
-            rm.addReservation(rr);
-        }
-
-        // Now must hack around becase this 'n that and JSON can't work out 
some things
-        String ct = (String) ((ADuccWork)r).getCompletionTypeObject();
-        r.setCompletionType(ReservationCompletionType.valueOf(ct));
-
-        String so = (String) ((ADuccWork)r).getStateObject();
-        r.setReservationState(ReservationState.valueOf(so));
-        
-        return r;
-    }
-       
     /**
      * Part of history management, recover ths indicated reservation from 
history.
      */
        public IDuccWorkReservation restoreReservation(long duccid)
         throws Exception
     {
-        DuccWorkReservation ret = null;
-        DbHandle h = null;
-        try {
-            h = dbManager.open();
-            Iterable<Vertex> q =  h.select("SELECT * FROM " + 
DbVertex.Reservation.pname() + " WHERE ducc_dbid=" + duccid + 
-                                           " AND " + DbConstants.DUCC_DBCAT 
+"='" + DbCategory.History.pname() + "'");
-            for ( Vertex v : q ) {
-                // There's only 1 unless db is broken.
-                return restoreReservationInternal(h, (OrientVertex) v);
-            }
-        } finally {
-            h.close();
-        }
-
-        return ret;
-       }
+        return (IDuccWorkReservation) restoreWork(IDuccWorkReservation.class, 
RES_TABLE, duccid);
+    }
        
     /**
      * Part of history management, recover ths indicated reservations from 
history.
@@ -543,44 +311,20 @@ public class HistoryManagerDb
        public ArrayList<IDuccWorkReservation> restoreReservations(long max) 
                throws Exception
     {
-        ArrayList<IDuccWorkReservation> ret = new 
ArrayList<IDuccWorkReservation>();
-        DbHandle h = null;
-        try {
-            h = dbManager.open();
-            Iterable<Vertex> q =  h.select("SELECT * FROM " + 
DbVertex.Reservation.pname() + 
-                                           " WHERE " + DbConstants.DUCC_DBCAT 
+ "='" + DbCategory.History.pname() + "'" + 
-                                           " ORDER BY ducc_dbid DESC LIMIT "+ 
max);
-            for ( Vertex v : q ) {
-                IDuccWorkReservation j = restoreReservationInternal(h, 
(OrientVertex) v);
-                ret.add(j);
-            }
-        } finally {
-            h.close();
-        }
+        return restoreSeveralThings(IDuccWorkReservation.class, RES_TABLE, 
max);
+    }
 
-        return ret;
-       }
     // End of reservations section
     // 
----------------------------------------------------------------------------------------------------
        
 
     // 
----------------------------------------------------------------------------------------------------
     // Services section
-    // public void serviceSave(IDuccWorkService s) 
-    //     throws Exception
-    // {
-    // }
 
     public void saveService(IDuccWorkService s)
        throws Exception
     {
-        saveJobInternal((IDuccWorkJob)s, DbVertex.ServiceInstance, true, 
DbCategory.History);
-    }
-
-    public void saveServiceUnsafe(IDuccWorkService s)
-       throws Exception
-    {
-        saveJobInternal((IDuccWorkJob)s, DbVertex.ServiceInstance, false, 
DbCategory.History);
+        saveWork(servicePrepare, s, true);
     }
 
        
@@ -590,19 +334,7 @@ public class HistoryManagerDb
        public IDuccWorkService restoreService(long duccid)
                throws Exception
     {
-        DbHandle h = null;
-        try {
-            h = dbManager.open();
-            Iterable<Vertex> q =  h.select("SELECT * FROM " + 
DbVertex.ServiceInstance.pname() + " WHERE ducc_dbid=" + duccid + 
-                                           " AND " + DbConstants.DUCC_DBCAT + 
"='" + DbCategory.History.pname() + "'");
-            for ( Vertex v : q ) {
-                return restoreJobInternal(h, (OrientVertex) v);
-            }
-        } finally {
-            h.close();
-        }
-
-        return null;
+        return (IDuccWorkService) restoreWork(IDuccWorkService.class, 
SVC_TABLE, duccid);
        }
        
     /**
@@ -611,23 +343,7 @@ public class HistoryManagerDb
        public ArrayList<IDuccWorkService> restoreServices(long max) 
                throws Exception
     {
-        ArrayList<IDuccWorkService> ret = new ArrayList<IDuccWorkService>();
-        DbHandle h = null;
-        try {
-            h = dbManager.open();
-            Iterable<Vertex> q =  h.select("SELECT * FROM " + 
DbVertex.ServiceInstance.pname() + 
-                                           " WHERE " + DbConstants.DUCC_DBCAT 
+"='" + DbCategory.History.pname() + "'" +
-                                           " ORDER BY ducc_dbid DESC LIMIT "+ 
max);
-            for ( Vertex v : q ) {
-                IDuccWorkService j = restoreJobInternal(h, (OrientVertex) v);
-                ret.add(j);
-            }
-        } finally {
-            h.close();
-        }
-
-        return ret;
-
+        return restoreSeveralThings(IDuccWorkService.class, SVC_TABLE, max);
        }
     // End of services section
     // 
----------------------------------------------------------------------------------------------------
@@ -652,127 +368,92 @@ public class HistoryManagerDb
         long now = System.currentTimeMillis();
         boolean ret = true;
 
-        DbHandle h = null;
-        try {
-            h = dbManager.open(); 
-        } catch ( Exception e ) {
-            logger.warn(methodName, null, "Cannot open database.", e);
-            if ( h != null ) h.close();
-            return false;
-        }
-
         // We transactionally delete the old checkpoint, and then save the new 
one.  If something gows wrong we
         // rollback and thus don't lose stuff.  In theory.
-       try {
-           h.execute("DELETE VERTEX V where " + DbConstants.DUCC_DBCAT + "='" 
+ DbCategory.Checkpoint.pname() + "'");
-           Map<DuccId, IDuccWork> map = work.getMap();
-           for (IDuccWork w : map.values() ) {
-               switch(w.getDuccType()) {
-               case Job:
-                   saveJobNoCommit(h, (IDuccWorkJob) w, DbVertex.Job, 
DbCategory.Checkpoint);
-                   break;
-               case Service:
-                   saveJobNoCommit(h, (IDuccWorkJob) w, 
DbVertex.ServiceInstance, DbCategory.Checkpoint);
-                   break;
-               case Reservation:
-                   if ( w.getDuccId().getFriendly() == 282282 ) {
-                       int x = 0;
-                       x++;
-                   }
-                   saveReservationNoCommit(h, (IDuccWorkReservation) w, 
DbCategory.Checkpoint);
-                   break;
-               default:
-                   break;
-               }
-           } 
-           
-           Gson g = DbHandle.mkGsonForJob();
-           ProcessToJobList l = new ProcessToJobList(processToJob);
-           String json = g.toJson(l, l.getClass());
-           // logger.info(methodName, null, "ProcessToJob:", json);
-           h.saveObject(DbVertex.ProcessToJob, null, json, 
DbCategory.Checkpoint);
-           h.commit();
+        
+        // TODO: make the truncate and insert transactional
+        DbHandle h = dbManager.open();
+        h.truncate("ducc.orckpt");
+
+        try {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            ObjectOutputStream out = new ObjectOutputStream(baos);
+            out.writeObject(work);
+            out.close();
+            byte[] bytes = baos.toByteArray();
+            ByteBuffer workbuf = ByteBuffer.wrap(bytes);
+
+            baos = new ByteArrayOutputStream();
+            out= new ObjectOutputStream(baos);
+            out.writeObject(processToJob);
+            out.close();
+            bytes = baos.toByteArray();
+            ByteBuffer mapbuf = ByteBuffer.wrap(bytes);
+            
+            h = dbManager.open();
+            h.saveObject(ckptPrepare, 0, workbuf, mapbuf);       
+
         } catch ( Exception e ) {
-            if ( h != null ) h.rollback();
             logger.error(methodName, null, "Cannot save ProcessToJob map", e);
             ret = false;
         } finally {
-            if ( h != null ) h.close();
             if ( ret ) logger.info(methodName, null, "Saved Orchestrator 
Checkpoint");
         }
 
-       logger.info(methodName, null, "Total time to save checkpoint:", 
System.currentTimeMillis() - now);
-       return ret;
+        logger.info(methodName, null, "Total time to save checkpoint:", 
System.currentTimeMillis() - now);
+        return ret;
     }
 
     /**
      * Orchestrator checkpoint.  Restore the checkpoint from the DB.  Caller 
must initialize
      * empty maps, which we fill in.
      */
-    public boolean restore(DuccWorkMap work, Map<DuccId, DuccId> processToJob)
+    public Pair<DuccWorkMap, Map<DuccId, DuccId>>  restore()
         throws Exception
     {
-       String methodName = "restore";
+        String methodName = "restore";
         DbHandle h = null;
-        boolean ret = true;
+        Pair<DuccWorkMap, Map<DuccId, DuccId>> ret = new Pair<DuccWorkMap, 
Map<DuccId, DuccId>>();
         try {
             h = dbManager.open();
-            // Select all the "top level" objects ith DUCC_LIVE=true.  When 
they get restored the
-            // attached object will get collected.
-
-            Iterable<Vertex> q =  h.select("SELECT * FROM V WHERE (" +
-                                            "@CLASS ='" + DbVertex.Job.pname() 
+
-                                            "' OR " + 
-                                            "@CLASS ='" + 
DbVertex.Reservation.pname() + 
-                                            "' OR " +  
-                                            "@CLASS ='" + 
DbVertex.ServiceInstance.pname() + 
-                                            "') AND "   + 
DbConstants.DUCC_DBCAT + "='" + DbCategory.Checkpoint.pname() + "'");
-
-            IDuccWork w = null;
-            for ( Vertex v : q ) {
-                String l = ((OrientVertex) v).getLabel();
-                if ( l.equals(DbVertex.Job.pname()) || 
l.equals(DbVertex.ServiceInstance.pname()) ) {
-                    w = restoreJobInternal(h, (OrientVertex) v);
-                } else if ( l.equals(DbVertex.Reservation.pname()) ) {
-                    w = restoreReservationInternal(h, (OrientVertex) v);
-                } else {
-                    logger.warn(methodName, null, "Unexpected record of type", 
l, "in checkpoint restore.");
+            String cql = "SELECT * FROM ducc.orckpt WHERE id=0";
+            ResultSet rs = h.execute(cql);
+            for ( Row r : rs ) {
+                logger.info(methodName, null, "Found checkpoint.");
+                ByteBuffer bbWork = r.getBytes("work");
+                ByteBuffer bbmap = r.getBytes("p2jmap");
+
+                byte[] workbytes = bbWork.array();
+                ByteArrayInputStream bais = new 
ByteArrayInputStream(workbytes);
+                ObjectInputStream ois = new ObjectInputStream(bais);
+                DuccWorkMap work = (DuccWorkMap) ois.readObject();
+                Map<DuccId, IDuccWork> map = work.getMap();
+                ois.close();
+
+                workbytes = bbmap.array();
+                bais = new ByteArrayInputStream(workbytes);
+                ois = new ObjectInputStream(bais);
+                Map<DuccId, DuccId> processToJob = (Map<DuccId, DuccId>) 
ois.readObject();
+                ois.close();
+
+                // hack because java serializion is stupid and won't call the 
no-args constructor - need
+                // to restore sometransient fields
+                Set<DuccId> ids = work.getReservationKeySet();
+                for ( DuccId id : ids ) {
+                    DuccWorkReservation res = (DuccWorkReservation) 
work.findDuccWork(DuccType.Reservation, ""+id.getFriendly());
+                    if ( r != null ) res.initLogger();
                 }
                 
-                work.addDuccWork(w);
+                ret = new Pair(work, processToJob);
             }
 
-            q = h.select("SELECT FROM " + DbVertex.ProcessToJob.pname());
-            
-            int count = 0;
-            for ( Vertex vv : q ) {
-                if ( count > 1 ) {
-                    logger.error(methodName, null, "Oops - we have multiple 
ProcessToJob records.  Using the first one but it may be wrong.");
-                    break;
-                }
-
-                OrientVertex v = (OrientVertex) vv;
-                ODocument d = v.getRecord();
-                String json = d.toJSON();
-                logger.info(methodName, null, json);
-
-                Gson g = DbHandle.mkGsonForJob();
-
-                ProcessToJobList l = g.fromJson(json, ProcessToJobList.class);
-                l.fill(processToJob);
-            }
-
-        } catch ( Exception e ) {
+       } catch ( Exception e ) {
             logger.error(methodName, null, "Error restoring checkpoint:", e);
-            ret = false;
-        } finally {
-            if ( h != null ) h.close();
-        }
-
-
+        } 
+        
         return ret;
     }
-
+    
     // End of OR checkpoint save and restore
     // 
----------------------------------------------------------------------------------------------------
     

Added: 
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/ReadCkpt.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/ReadCkpt.java?rev=1711088&view=auto
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/ReadCkpt.java
 (added)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/ReadCkpt.java
 Wed Oct 28 18:12:53 2015
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.database;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.transport.event.common.DuccWorkMap;
+
+/**
+ * A test routine, read and verify the OR checkpoint file.
+ */
+
+public class ReadCkpt
+{
+
+    DuccLogger logger = DuccLogger.getLogger(DbLoader.class, "DBLOAD");
+    String DUCC_HOME;
+    DbManager dbManager = null;
+    HistoryManagerDb hmd = null;
+
+    public ReadCkpt()
+       throws Exception
+    {
+        DUCC_HOME = System.getProperty("DUCC_HOME");        
+        if ( DUCC_HOME == null ) {
+            System.out.println("System proprety -DDUCC_HOME must be set.");
+            System.exit(1);
+        }
+        
+        String state_url = "bluej538";
+        System.setProperty("ducc.state.database.url", state_url);
+
+        dbManager = new DbManager(state_url, logger);
+        dbManager.init();
+
+        hmd = new HistoryManagerDb();
+    }
+
+    public void run()
+    {
+        DuccWorkMap work = new DuccWorkMap();
+        Map<DuccId, DuccId> processToJob = new HashMap<DuccId, DuccId>();
+
+    }
+
+    public static void main(String [] args)
+    {
+        try {
+                       ReadCkpt rc = new ReadCkpt();
+                       rc.run();
+               } catch (Exception e) {
+                       // TODO Auto-generated catch block
+                       e.printStackTrace();
+               }
+
+
+    }
+
+}

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmStatePersistence.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmStatePersistence.java?rev=1711088&r1=1711087&r2=1711088&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmStatePersistence.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmStatePersistence.java
 Wed Oct 28 18:12:53 2015
@@ -19,17 +19,16 @@
 
 package org.apache.uima.ducc.database;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
 import org.apache.uima.ducc.common.persistence.rm.IRmPersistence;
 import org.apache.uima.ducc.common.utils.DuccLogger;
-import org.apache.uima.ducc.database.DbConstants.DbCategory;
-import org.apache.uima.ducc.database.DbConstants.DbVertex;
 
-import com.google.gson.Gson;
-import com.tinkerpop.blueprints.impls.orient.OrientVertex;
+import com.datastax.driver.core.SimpleStatement;
 
 /**
  * Manage saving and fetching of transient RM state.  The primary consumer is
@@ -67,6 +66,11 @@ public class RmStatePersistence
         init(stateUrl);
     }
 
+    public void shutdown()
+    {
+        dbManager.shutdown();
+    }
+
     public void clear()
         throws Exception
     {
@@ -74,41 +78,55 @@ public class RmStatePersistence
         DbHandle h = null;
         try {
             h = dbManager.open();
-            h.execute("DELETE VERTEX V where " + DbConstants.DUCC_DBCAT + "='" 
+ DbCategory.RmState.pname() + "'");
+            h.execute("TRUNCATE ducc.rmnodes");
         } catch ( Exception e ) {
             logger.error(methodName, null, "Cannot clear the database.", e);
-        } finally {
-            if ( h != null ) h.close();
-        }
+        } 
     }
 
-    public String toGson(Object o)
+    static List<SimpleStatement> mkSchema()
+       throws Exception
     {
-        // We need to define Instance creators and such so we do it in a 
common place
-        Gson g = DbHandle.mkGsonForJob();
-        return g.toJson(o);
+        List<SimpleStatement> ret = new ArrayList<SimpleStatement>();
+
+        StringBuffer buf = new StringBuffer("CREATE TABLE IF NOT EXISTS ducc." 
+ RmProperty.TABLE_NAME.pname() + " (");
+        buf.append(DbUtil.mkSchema(RmProperty.values()));
+        buf.append(")");    
+        ret.add(new SimpleStatement(buf.toString()));
+        return ret;
     }
 
-    public Object createMachine(String m, Properties props)
+    // static String[] mkSchemaItems()
+    // {
+    //     int size = RmProperty.values().length;
+    //     String[] ret = new String[size];
+    //     int ndx = 0;
+
+    //     for ( RmProperty n: RmProperty.values() ) {
+    //         String s = n.pname();
+    //         s = s + " " + DbUtil.typeToString(n.type());
+    //         if ( n.isPrimaryKey() ) {
+    //             s = s + " PRIMARY KEY";
+    //         }
+    //         ret[ndx++] = s;
+    //     }
+    //     return ret;
+    // }
+
+    public void createMachine(String m, Map<RmProperty, Object> props)
        throws Exception
     {
         String methodName = "createMachine";
         DbHandle h = dbManager.open();
-        Object ret = null;
         try {           
-            OrientVertex v = h.createProperties(DbConstants.DUCC_DBNODE, m, 
DbVertex.RmNode,  DbCategory.RmState, props);
-            ret = v.getId();
-            h.commit();
+            String cql = DbUtil.mkInsert("ducc.rmnodes", props);
+            h.execute(cql);
         } catch ( Exception e ) {
-            logger.error(methodName, null, "Update", m, "ROLLBACK: ", e);
-            if ( h != null ) h.rollback();
-        } finally {
-            if ( h != null ) h.close();
-        }
-        return ret;
+            logger.error(methodName, null, "Error creating new record:", e);
+        } 
     }
 
-    public void setProperties(Object dbid, String dbk, Object... props)
+    public void setProperties(String node, Object... props)
        throws Exception
     {
         String methodName = "setProperties";
@@ -121,37 +139,28 @@ public class RmStatePersistence
         DbHandle h = dbManager.open();
 
         try {           
-            h.updateProperties(dbid, props);
-            h.commit();
+            h.updateProperties("ducc.rmnodes", "WHERE name='" + node + "'", 
props);
         } catch ( Exception e ) {
-            logger.error(methodName, null, "Update", dbk, "ROLLBACK: ", e);
-            if ( h != null ) h.rollback();
-        } finally {
-            if ( h != null ) h.close();
-            logger.info(methodName, null, "Total time to update properties 
on", dbid.toString(), System.currentTimeMillis() - now);
+            logger.error(methodName, null, "Problem setting properties");
+        } finally {           
+            logger.info(methodName, null, "Total time to update properties 
on", System.currentTimeMillis() - now);
 
         }
         
     }
 
-    public void setProperty(Object dbid, String dbk, RmPropName k, Object v)
+    public void setProperty(String node, RmProperty k, Object v)
        throws Exception
     {
         String methodName = "setProperty";
-        long now = System.currentTimeMillis();
 
         DbHandle h = dbManager.open();
 
         try {           
-            h.updateProperty(dbid, k.pname(), v);
-            h.commit();
+            h.updateProperty("ducc.rmnodes", "name='" + node + "'", 
k.columnName(), v);
         } catch ( Exception e ) {
-            logger.error(methodName, null, "Update", dbk, "ROLLBACK: ", e);
-            if ( h != null ) h.rollback();
-        } finally {
-            if ( h != null ) h.close();
-            logger.info(methodName, null, "Total time to update property on", 
dbid.toString(), System.currentTimeMillis() - now);
-        }
+            logger.error(methodName, null, "Problem setting properties.");
+        } 
         
     }
     
@@ -172,28 +181,3 @@ public class RmStatePersistence
     }
 
 }
-
-/**
-    String name;
-    String nodepoolId;
-    long memory;
-    int order;
-    boolean blacklisted;                                         // UIMA-4142
-    boolean online;                                              // UIMA-4234
-    boolean responsive;                                          // UIMA-4234
-
-
-
-
-   Properties file for a node
-   name = string
-   ip   = string
-   state = <state>
-      states: vary status: online | offline
-              reporting  : present | absent
-   nodepool = string
-   quantum = string
-   class = string
-   scheduling policy = string
-   scheduled work = list of duccids of work on the node
- */


Reply via email to