Added: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbTester.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbTester.java?rev=1703203&view=auto ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbTester.java (added) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbTester.java Tue Sep 15 14:31:24 2015 @@ -0,0 +1,592 @@ +package org.apache.uima.ducc.database; + +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.uima.ducc.common.persistence.services.IStateServices; +import org.apache.uima.ducc.common.utils.DuccLogger; +import org.apache.uima.ducc.common.utils.id.DuccId; +import org.apache.uima.ducc.transport.event.common.IDuccWorkJob; +import org.apache.uima.ducc.transport.event.common.IDuccWorkReservation; +import org.apache.uima.ducc.transport.event.common.IDuccWorkService; + +/** + * Toy orientdb loader to load a historydb from ducc history + */ + +public class DbTester +{ + DuccLogger logger = DuccLogger.getLogger(DbTester.class, "DBLOAD"); + + HistoryManagerDb hmd = null; + StateServicesDb ssd = null; + + String history_url = "remote:localhost/DuccHistory"; + String state_url = "remote:localhost/DuccState"; + + // String jobHistory = System.getProperty("user.home") + "/ducc_runtime/history/jobs"; + String jobHistory = "/home/ducc/ducc_runtime/history/jobs"; + + // String reservationHistory = System.getProperty("user.home") + "/ducc_runtime/history/reservations"; + String reservationHistory = "/home/ducc/ducc_runtime/history/reservations"; + + //String serviceHistory = System.getProperty("user.home") + "/ducc_runtime/history/services"; + String serviceHistory = "/home/ducc/ducc_runtime/history/services"; + + //String serviceHistory = System.getProperty("user.home") + "/ducc_runtime/history/services"; + String serviceRegistryHistory = "/home/ducc/ducc_runtime/history/services-registry"; + + //String serviceRegistry = System.getProperty("user.home") + "/ducc_runtime/state/services"; + String serviceRegistry = "/home/ducc/ducc_runtime/state/services"; + + int nthreads = 20; + AtomicInteger counter = new AtomicInteger(0); + + public DbTester() + throws Exception + { + System.setProperty("ducc.history.database.url", history_url); + System.setProperty("ducc.state.database.url", state_url); + } + + void closeStream(InputStream in) + { + try { in.close(); } catch(Exception e) {} + } + + public void loadJobs() + { + String methodName = "loadJobs"; + LinkedBlockingQueue<IDuccWorkJob> jobqueue = new LinkedBlockingQueue<IDuccWorkJob>(); + + int max_to_load = 2; + int nth = Math.min(nthreads, max_to_load); + JobLoader[] loader = new JobLoader[nth]; + Thread[] threads = new Thread[nth]; + List<Long> ids = new ArrayList<Long>(); + + for ( int i = 0; i < nth; i++ ) { + loader[i] = new JobLoader(jobqueue, ids); + threads[i] = new Thread(loader[i]); + threads[i].start(); + } + + File dir = new File(jobHistory); + File[] files = dir.listFiles(); + logger.info(methodName, null, "Reading", files.length, "jobs."); + + int c = 0; + for ( File f : files) { + String s = f.toString(); + if ( s.endsWith(".dwj") ) { + logger.info(methodName, null, "Loading file", c++, ":", f); + IDuccWorkJob job = null; + FileInputStream fis = null; + ObjectInputStream in = null; + + try { + long now = System.currentTimeMillis(); + fis = new FileInputStream(f); + in = new ObjectInputStream(fis); + job = (IDuccWorkJob) in.readObject(); + logger.info(methodName, job.getDuccId(), "Time to read job:", System.currentTimeMillis() - now); + jobqueue.offer(job); + counter.getAndIncrement(); + } catch(Exception e) { + logger.info(methodName, null, e); + } finally { + // restoreJob(job.getDuccId().getFriendly()); + closeStream(in); + closeStream(fis); + if ( c >= max_to_load ) break; + } + } else { + logger.info(methodName, null, "Can't find history file", f); + } + } + + while ( (c = counter.get()) != 0 ) { + try { + logger.info(methodName, null, "Waiting for loads to finish, counter is", c, "(job)."); + Thread.sleep(1000); + } + catch ( Exception e ) {} + } + + for ( int i = 0; i < nth; i++ ) { + logger.info(methodName, null, "Intterupt thread (job)", i); + threads[i].interrupt(); + } + + for ( int i = 0; i < nth; i++ ) { + logger.info(methodName, null, "Joining thread (job)", i); + try { threads[i].join(); } catch ( InterruptedException e ) {} + } + + } + + public void loadReservations() + { + String methodName = "loadReservations"; + + LinkedBlockingQueue<IDuccWorkReservation> queue = new LinkedBlockingQueue<IDuccWorkReservation>(); + + int max_to_load = Integer.MAX_VALUE; + int nth = Math.min(nthreads, max_to_load); + ReservationLoader[] loader = new ReservationLoader[nth]; + Thread[] threads = new Thread[nth]; + ArrayList<Long> ids = new ArrayList<Long>(); + + for ( int i = 0; i < nth; i++ ) { + loader[i] = new ReservationLoader(queue, ids); + threads[i] = new Thread(loader[i]); + threads[i].start(); + } + + File dir = new File(reservationHistory); + int c = 0; + + File[] files = dir.listFiles(); + logger.info(methodName, null, "Reading", files.length, "reservation instances."); + for ( File f : dir.listFiles() ) { + String s = f.toString(); + if ( s.endsWith(".dwr") ) { + logger.info(methodName, null, "Loading file", c++, ":", f); + IDuccWorkReservation res = null; + FileInputStream fis = null; + ObjectInputStream in = null; + + try { + long now = System.currentTimeMillis(); + fis = new FileInputStream(f); + in = new ObjectInputStream(fis); + res = (IDuccWorkReservation) in.readObject(); + logger.info(methodName, res.getDuccId(), "Time to read reservation:", System.currentTimeMillis() - now); + + queue.offer(res); + counter.getAndIncrement(); + } catch(Exception e) { + logger.info(methodName, null, e); + } finally { + // restoreJob(job.getDuccId().getFriendly()); + closeStream(in); + closeStream(fis); + if ( c >= max_to_load ) break; + } + } else { + logger.info(methodName, null, "Can't find history file", f); + } + + } + + while ( (c = counter.get()) != 0 ) { + try { + logger.info(methodName, null, "Waiting for reservation loads to finish, counter is", c); + Thread.sleep(1000); + } + catch ( Exception e ) {} + } + + for ( int i = 0; i < nth; i++ ) { + logger.info(methodName, null, "Intterupt thread (reservations).", i); + threads[i].interrupt(); + } + + for ( int i = 0; i < nth; i++ ) { + logger.info(methodName, null, "Joining thread (reservations).", i); + try { threads[i].join(); } catch ( InterruptedException e ) {} + } + + // try { + // List<IDuccWorkReservation> ress = hmd.restoreReservations(c); + // logger.info(methodName, null, "Recovered", ress.size(), "reservations."); + // } catch (Exception e) { + // // TODO Auto-generated catch block + // e.printStackTrace(); + // } + } + + + public void loadServices() + { + String methodName = "loadServices"; + LinkedBlockingQueue<IDuccWorkService> queue = new LinkedBlockingQueue<IDuccWorkService>(); + + int max_to_load = Integer.MAX_VALUE; + int nth = Math.min(nthreads, max_to_load); + ServiceLoader[] loader = new ServiceLoader[nth]; + Thread[] threads = new Thread[nth]; + ArrayList<Long> ids = new ArrayList<Long>(); + + for ( int i = 0; i < nth; i++ ) { + loader[i] = new ServiceLoader(queue, ids); + threads[i] = new Thread(loader[i]); + threads[i].start(); + } + + File dir = new File(serviceHistory); + int c = 0; + + File[] files = dir.listFiles(); + logger.info(methodName, null, "Reading", files.length, "service instances."); + for ( File f : files ) { + String s = f.toString(); + if ( s.endsWith(".dws") ) { + logger.info(methodName, null, "Loading file", c++, ":", f); + IDuccWorkService svc = null; + FileInputStream fis = null; + ObjectInputStream in = null; + + try { + long now = System.currentTimeMillis(); + fis = new FileInputStream(f); + in = new ObjectInputStream(fis); + svc = (IDuccWorkService) in.readObject(); + logger.info(methodName, svc.getDuccId(), "Time to read service:", System.currentTimeMillis() - now); + + + queue.offer(svc); + counter.getAndIncrement(); + } catch(Exception e) { + logger.info(methodName, null, e); + } finally { + // restoreJob(job.getDuccId().getFriendly()); + closeStream(in); + closeStream(fis); + if ( c >= max_to_load ) break; + } + } else { + logger.info(methodName, null, "Can't find history file", f); + } + } + + while ( (c = counter.get()) != 0 ) { + try { + logger.info(methodName, null, "Waiting for service loads to finish, counter is", c); + Thread.sleep(1000); + } + catch ( Exception e ) {} + } + + for ( int i = 0; i < nth; i++ ) { + logger.info(methodName, null, "Intterupt thread (services).", i); + threads[i].interrupt(); + } + + for ( int i = 0; i < nth; i++ ) { + logger.info(methodName, null, "Joining thread (services).", i); + try { threads[i].join(); } catch ( InterruptedException e ) {} + } + + // try { + // List<IDuccWorkService> services = hmd.restoreServices(c); + // logger.info(methodName, null, "Recovered", services.size(), "serves."); + // } catch (Exception e) { + // // TODO Auto-generated catch block + // e.printStackTrace(); + // } + + } + + public void loadServiceRegistry(String registry) + { + String methodName = "loadServiceRegistry"; + + LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<String>(); + + int max_to_load = Integer.MAX_VALUE; + int nth = Math.min(nthreads, max_to_load); + ServiceRegistrationLoader[] loader = new ServiceRegistrationLoader[nth]; + Thread[] threads = new Thread[nth]; + ArrayList<Long> ids = new ArrayList<Long>(); + + for ( int i = 0; i < nth; i++ ) { + loader[i] = new ServiceRegistrationLoader(queue, ids); + threads[i] = new Thread(loader[i]); + threads[i].start(); + } + + int c = 0; + File dir = new File(registry); + File[] files = dir.listFiles(); + logger.info(methodName, null, "Reading", files.length, "service files (2 per instance)."); + for ( File f : files ) { + String s = f.toString(); + if ( s.endsWith(".svc") ) { + int ndx = s.indexOf(".svc"); + String numeric = s.substring(0, ndx); + queue.offer(numeric); + counter.getAndIncrement(); + + if ( ++c >= max_to_load ) break; + } + + } + + while ( (c = counter.get()) != 0 ) { + try { + logger.info(methodName, null, "Waiting for service registry loads to finish, counter is", c); + Thread.sleep(1000); + } + catch ( Exception e ) {} + } + + for ( int i = 0; i < nth; i++ ) { + logger.info(methodName, null, "Intterupt thread (service registry).", i); + threads[i].interrupt(); + } + + for ( int i = 0; i < nth; i++ ) { + logger.info(methodName, null, "Joining thread (service registry).", i); + try { threads[i].join(); } catch ( InterruptedException e ) {} + } + + } + + void run() + { + String methodName = "run"; + // load the history db + if ( true ) { + try { + + hmd = new HistoryManagerDb(logger); + + // ---------- Load job history + loadJobs(); + if ( true ) return; + + // ---------- Load reservation history + loadReservations(); + + // ---------- Load service isntance and AP history + loadServices(); + + // ---------- Load service registry + ssd = new StateServicesDb(); + ssd.init(logger); + loadServiceRegistry(serviceRegistry); + try { + ssd.shutdown(); + } catch ( Exception e ) { + e.printStackTrace(); + } + + // ---------- Load service registry history + ssd = new StateServicesDb(); + ssd.init(logger); + loadServiceRegistry(serviceRegistryHistory); + } catch ( Exception e ) { + logger.error(methodName, null, e); + } finally { + if ( hmd != null ) hmd.shutdown(); + if ( ssd != null ) ssd.shutdown(); + } + } + + // load the service registry + // loadServiceRegistry(); + } + + public static void main(String[] args) + { + DbTester dbl = null; + try { + dbl = new DbTester(); + dbl.run(); + } catch ( Exception e ) { + e.printStackTrace(); + } + } + + class JobLoader + implements Runnable + { + BlockingQueue<IDuccWorkJob> queue; + List<Long> ids; + JobLoader(BlockingQueue<IDuccWorkJob> queue, List<Long> ids) + { + this.queue = queue; + this.ids = ids; + } + + public void run() + { + String methodName = "JobLoader.run"; + while ( true ) { + IDuccWorkJob job = null; + try { + logger.info(methodName, null, "About to take (job)."); + job = queue.take(); + } catch ( InterruptedException e ) { + return; + } + logger.info(methodName, job.getDuccId(), "Took a job."); + try { + //h = dbManager.open(); + hmd.saveJob(job); + //h.close(); + synchronized(ids) { + ids.add(job.getDuccId().getFriendly()); + } + counter.getAndDecrement(); + } catch(Exception e) { + logger.info(methodName, null, e); + } + } + } + } + + + class ServiceLoader + implements Runnable + { + BlockingQueue<IDuccWorkService> queue; + List<Long> ids; + ServiceLoader(BlockingQueue<IDuccWorkService> queue, List<Long> ids) + { + this.queue = queue; + this.ids = ids; + } + + public void run() + { + String methodName = "ServiceLoader.run"; + while ( true ) { + IDuccWorkService svc = null; + try { + logger.info(methodName, null, "About to take (service)."); + svc = queue.take(); + } catch ( InterruptedException e ) { + return; + } + logger.info(methodName, svc.getDuccId(), "Took a Service"); + try { + //h = dbManager.open(); + hmd.saveServiceUnsafe(svc); + //h.close(); + synchronized(ids) { + ids.add(svc.getDuccId().getFriendly()); + } + counter.getAndDecrement(); + } catch(Exception e) { + logger.info(methodName, null, e); + } + } + } + } + + class ReservationLoader + implements Runnable + { + BlockingQueue<IDuccWorkReservation> queue; + List<Long> ids; + ReservationLoader(BlockingQueue<IDuccWorkReservation> queue, List<Long> ids) + { + this.queue = queue; + this.ids = ids; + } + + public void run() + { + String methodName = "ReservationLoader.run"; + while ( true ) { + IDuccWorkReservation res = null; + try { + logger.info(methodName, null, "About to take (reservation)."); + res = queue.take(); + } catch ( InterruptedException e ) { + return; + } + logger.info(methodName, res.getDuccId(), "Took a Service"); + try { + //h = dbManager.open(); + hmd.saveReservationUnsafe(res); + //h.close(); + synchronized(ids) { + ids.add(res.getDuccId().getFriendly()); + } + counter.getAndDecrement(); + } catch(Exception e) { + logger.info(methodName, null, e); + } + } + } + } + + + class ServiceRegistrationLoader + implements Runnable + { + BlockingQueue<String> queue; + List<Long> ids; + ServiceRegistrationLoader(BlockingQueue<String> queue, List<Long> ids) + { + this.queue = queue; + this.ids = ids; + } + + public void run() + { + String methodName = "ServiceRegistrationLoader.run"; + while ( true ) { + String id = null; + try { + logger.info(methodName, null, "About to take (service id)."); + id = queue.take(); + } catch ( InterruptedException e ) { + return; + } + logger.info(methodName, null, id, "Took a service id"); + FileInputStream svc_in = null; + FileInputStream meta_in = null; + try { + Properties svc_props = new Properties(); + Properties meta_props = new Properties(); + + String svc_name = id + ".svc"; + String meta_name = id + ".meta"; + + svc_in = new FileInputStream(svc_name); + meta_in = new FileInputStream(meta_name); + svc_props.load(svc_in); + meta_props.load(meta_in); + String sid = meta_props.getProperty(IStateServices.SvcProps.numeric_id.pname()); + if ( sid == null ) { + logger.warn(methodName, null, "Cannot find service id in meta file for", id); + } else { + if ( id.indexOf(sid) < 0 ) { + // must do index of because the 'id' is a full path, not just the numeric id. so we + // are satisfied with making sure the id is in the path. + throw new IllegalStateException("Service id and internal id do not match."); + } + DuccId did = new DuccId(Long.parseLong(sid)); + + ssd.storePropertiesUnsafe(did, svc_props, meta_props); + + synchronized(ids) { + ids.add(did.getFriendly()); + } + } + counter.getAndDecrement(); + } catch(Exception e) { + logger.info(methodName, null, e); + } finally { + closeStream(svc_in); + closeStream(meta_in); + } + + } + } + } + + +}
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/HistoryManagerDb.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/HistoryManagerDb.java?rev=1703203&view=auto ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/HistoryManagerDb.java (added) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/HistoryManagerDb.java Tue Sep 15 14:31:24 2015 @@ -0,0 +1,1006 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. +*/ +package org.apache.uima.ducc.database; + +import java.io.IOException; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.uima.ducc.common.DuccNode; +import org.apache.uima.ducc.common.IIdentity; +import org.apache.uima.ducc.common.Node; +import org.apache.uima.ducc.common.NodeIdentity; +import org.apache.uima.ducc.common.SizeBytes; +import org.apache.uima.ducc.common.main.DuccService; +import org.apache.uima.ducc.common.utils.DuccLogger; +import org.apache.uima.ducc.common.utils.id.DuccId; +import org.apache.uima.ducc.common.utils.id.IDuccId; +import org.apache.uima.ducc.database.DbConstants.DbCategory; +import org.apache.uima.ducc.database.DbConstants.DbEdge; +import org.apache.uima.ducc.database.DbConstants.DbVertex; +import org.apache.uima.ducc.transport.agent.IUimaPipelineAEComponent; +import org.apache.uima.ducc.transport.cmdline.ICommandLine; +import org.apache.uima.ducc.transport.event.common.ADuccWork; +import org.apache.uima.ducc.transport.event.common.DuccProcess; +import org.apache.uima.ducc.transport.event.common.DuccProcessMap; +import org.apache.uima.ducc.transport.event.common.DuccReservation; +import org.apache.uima.ducc.transport.event.common.DuccWorkJob; +import org.apache.uima.ducc.transport.event.common.DuccWorkMap; +import org.apache.uima.ducc.transport.event.common.DuccWorkPopDriver; +import org.apache.uima.ducc.transport.event.common.DuccWorkReservation; +import org.apache.uima.ducc.transport.event.common.IDuccCompletionType.JobCompletionType; +import org.apache.uima.ducc.transport.event.common.IDuccCompletionType.ReservationCompletionType; +import org.apache.uima.ducc.transport.event.common.IDuccPerWorkItemStatistics; +import org.apache.uima.ducc.transport.event.common.IDuccProcess; +import org.apache.uima.ducc.transport.event.common.IDuccProcessMap; +import org.apache.uima.ducc.transport.event.common.IDuccProcessWorkItems; +import org.apache.uima.ducc.transport.event.common.IDuccReservation; +import org.apache.uima.ducc.transport.event.common.IDuccReservationMap; +import org.apache.uima.ducc.transport.event.common.IDuccSchedulingInfo; +import org.apache.uima.ducc.transport.event.common.IDuccStandardInfo; +import org.apache.uima.ducc.transport.event.common.IDuccState.JobState; +import org.apache.uima.ducc.transport.event.common.IDuccState.ReservationState; +import org.apache.uima.ducc.transport.event.common.IDuccUimaAggregateComponent; +import org.apache.uima.ducc.transport.event.common.IDuccUimaDeployableConfiguration; +import org.apache.uima.ducc.transport.event.common.IDuccWork; +import org.apache.uima.ducc.transport.event.common.IDuccWorkJob; +import org.apache.uima.ducc.transport.event.common.IDuccWorkReservation; +import org.apache.uima.ducc.transport.event.common.IDuccWorkService; +import org.apache.uima.ducc.transport.event.common.IRationale; +import org.apache.uima.ducc.transport.event.common.ITimeWindow; +import org.apache.uima.ducc.transport.event.common.JdReservationBean; +import org.apache.uima.ducc.transport.event.common.history.IHistoryPersistenceManager; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.InstanceCreator; +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; +import com.google.gson.JsonParser; +import com.google.gson.JsonSerializationContext; +import com.google.gson.JsonSerializer; +import com.google.gson.TypeAdapter; +import com.google.gson.TypeAdapterFactory; +import com.google.gson.reflect.TypeToken; +import com.google.gson.stream.JsonReader; +import com.google.gson.stream.JsonToken; +import com.google.gson.stream.JsonWriter; +import com.orientechnologies.orient.core.record.impl.ODocument; +import com.tinkerpop.blueprints.Direction; +import com.tinkerpop.blueprints.Edge; +import com.tinkerpop.blueprints.Vertex; +import com.tinkerpop.blueprints.impls.orient.OrientEdge; +import com.tinkerpop.blueprints.impls.orient.OrientVertex; + + +public class HistoryManagerDb + implements IHistoryPersistenceManager +{ + + + private DuccLogger logger = null; + private String dburl; + private DbManager dbManager; + + public HistoryManagerDb() + { + this(DuccService.getDuccLogger(HistoryManagerDb.class.getName())); + } + + public HistoryManagerDb(DuccLogger logger) + { + + this.logger = logger; + dburl = System.getProperty("ducc.state.database.url"); + try { + dbManager = new DbManager(dburl, logger); + dbManager.init(); + } catch ( Exception e ) { + logger.error("HisstoryManagerDb", null, "Cannot open the history database:", e); + } + } + + public void setLogger(DuccLogger logger) + { + this.logger = logger; + } + + // ---------------------------------------------------------------------------------------------------- + // Jobs section + + Gson mkGsonForJob() + { + // We need to define Instance creators and such so we do it in a common place + GsonBuilder gb = new GsonBuilder(); + + GenericInterfaceAdapter customAdapter = new GenericInterfaceAdapter(); + gb.serializeSpecialFloatingPointValues().setPrettyPrinting(); + gb.enableComplexMapKeySerialization(); + + gb.registerTypeAdapter(Node.class, new NodeInstanceCreator()); + gb.registerTypeAdapter(NodeIdentity.class, new NodeIdentityCreator()); + + //gb.registerTypeAdapter(IIdentity.class, new IdentityInstanceCreator()); + gb.registerTypeAdapter(IIdentity.class, customAdapter); + + gb.registerTypeAdapter(IDuccId.class, customAdapter); + gb.registerTypeAdapter(ITimeWindow.class, customAdapter); + gb.registerTypeAdapter(IDuccProcessWorkItems.class, customAdapter); + gb.registerTypeAdapter(IDuccUimaAggregateComponent.class, customAdapter); + gb.registerTypeAdapter(IUimaPipelineAEComponent.class, customAdapter); + gb.registerTypeAdapter(IRationale.class, customAdapter); + gb.registerTypeAdapter(IDuccUimaDeployableConfiguration.class, customAdapter); + gb.registerTypeAdapter(IDuccStandardInfo.class, customAdapter); + gb.registerTypeAdapter(IDuccSchedulingInfo.class, customAdapter); + gb.registerTypeAdapter(IDuccPerWorkItemStatistics.class, customAdapter); + gb.registerTypeAdapter(IDuccReservationMap.class, customAdapter); + gb.registerTypeAdapter(JdReservationBean.class, customAdapter); + + //ConcurrentHashMap<DuccId, Long> x = new ConcurrentHashMap<DuccId, Long>(); + //gb.registerTypeAdapter(x.getClass(), new MapAdaptor()); + + //gb.registerTypeAdapterFactory(new DuccTypeFactory()); + //Object obj = new ArrayList<IJdReservation>(); + //gb.registerTypeAdapter(obj.getClass(), customAdapter); + Gson g = gb.create(); + return g; + } + + /** + * Common code to save a job in an open handle. Caller will commit or fail as needed. + */ + private void saveJobNoCommit(DbHandle h, IDuccWorkJob j, DbVertex type, DbCategory dbcat) + throws Exception + { + String methodName = "saveJobNoCommit"; + Long nowP = System.currentTimeMillis(); + // Nuke the command lines + DuccWorkPopDriver driver = j.getDriver(); + ICommandLine driverCl = null; + IDuccProcessMap jdProcessMap = null; + + if ( driver != null ) { + driverCl = driver.getCommandLine(); + driver.setCommandLine(null); + jdProcessMap = driver.getProcessMap(); + driver.setProcessMap(null); + } + + ICommandLine jobCl = j.getCommandLine(); + j.setCommandLine(null); + + IDuccPerWorkItemStatistics stats = j.getSchedulingInfo().getPerWorkItemStatistics(); + + if ( stats != null ) { + if (Double.isNaN(stats.getStandardDeviation()) ) { + stats.setStandardDeviation(0.0); + } + } + + // Pull process map so we can put processes in their own records + IDuccProcessMap processMap = j.getProcessMap(); + j.setProcessMap(null); + + Gson g = mkGsonForJob(); + + String dbJob = g.toJson(j); + + // Must repair these things because OR continues to use the job after it has been + // written to history. + j.setProcessMap(processMap); + j.setCommandLine(jobCl); + if ( driver != null ) { + driver.setCommandLine(driverCl); + driver.setProcessMap(jdProcessMap); + } + + Object savedJob = h.saveObject(type, j.getDuccId().getFriendly(), dbJob, dbcat); + + List<Object> savedJPs = new ArrayList<Object>(); + List<Object> savedJDs = new ArrayList<Object>(); + for (DuccId did : processMap.keySet()) { + Long pid = did.getFriendly(); + + IDuccProcess p = processMap.get(did); + String proc = g.toJson(p); + + savedJPs.add(h.saveObject(DbVertex.Process, pid, proc, dbcat)); + // logger.info(methodName, p.getDuccId(), "2 ----------> Time to save process", System.currentTimeMillis() - nowP); + + } + + if ( driver != null ) { + for (DuccId did : jdProcessMap.keySet()) { + Long pid = did.getFriendly(); + + IDuccProcess p = jdProcessMap.get(did); + String proc = g.toJson(p); + + savedJDs.add(h.saveObject(DbVertex.Process, pid, proc, dbcat)); + // logger.info(methodName, p.getDuccId(), "2 ----------> Time to save process", System.currentTimeMillis() - nowP); + + } + h.addEdges(savedJob, savedJDs, DbEdge.JdProcess); + } + + h.addEdges(savedJob, savedJPs, DbEdge.JpProcess); + + logger.info(methodName, j.getDuccId(), "----------> Time to save job", System.currentTimeMillis() - nowP); + + } + + private void saveJobInternal(IDuccWorkJob j, DbVertex type, boolean safe, DbCategory dbcat) + throws Exception + { + + // It seems that services instances are represented as jobs without drivers. So we use + // the common code here, passing in the vertex type, for both jobs and services. + + String methodName = "saveJob"; + logger.info(methodName, j.getDuccId(), "Saving: type:", type.pname(), "safe:", safe, "DbCategory:", dbcat); + + Long id = j.getDuccId().getFriendly(); + DbHandle h = null; + try { + if ( safe ) { + h = dbManager.open(); + } else { + h = dbManager.open(); + } + if ( safe && h.thingInDatabase(id, type, dbcat) ) { + logger.warn(methodName, j.getDuccId(), "Not overwriting saved job."); + h.close(); + return; + } + } catch ( Exception e ) { + if ( h != null ) h.close(); + throw e; + } + + try { + saveJobNoCommit(h, j, type, dbcat); + } catch ( Exception e ) { + h.rollback(); + logger.error(methodName, j.getDuccId(), "Cannot store job", e); + throw e; + } finally { + h.commit(); + } + } + + /** + * For use by the loader, load it without the existence check; the assumption this is a first-time load + * and the check isn't needed. This saves history only. + */ + public void saveJobUnsafe(IDuccWorkJob j) + throws Exception + { + saveJobInternal(j, DbVertex.Job, false, DbCategory.History); + } + + /** + * For use by normal operation: forces an existence check. This saves history only. + */ + public void saveJob(IDuccWorkJob j) + throws Exception + { + saveJobInternal(j, DbVertex.Job, true, DbCategory.History); + } + + + private IDuccWorkJob restoreJobInternal(DbHandle h, OrientVertex v) + throws Exception + { + IDuccWorkJob j = null; + + ODocument d = v.getRecord(); + String json = d.toJSON(); + JsonObject jo = mkJsonObject(json); + + Gson g = mkGsonForJob(); + j = g.fromJson(jo, DuccWorkJob.class); + + // System.out.println(g.toJson(jo)); + + IDuccProcessMap pm = j.getProcessMap(); // seems to get set by default when job is recovered + Iterable<Edge> ed = v.getEdges(Direction.OUT, DbEdge.JpProcess.pname()); + for ( Edge e : ed ) { + OrientEdge oe = (OrientEdge) e; + OrientVertex ov = oe.getVertex(Direction.IN); + + ODocument pd = ov.getRecord(); + String pjson = pd.toJSON(); + + IDuccProcess pe = g.fromJson(pjson, DuccProcess.class); + pm.addProcess(pe); + } + + DuccWorkPopDriver driver = j.getDriver(); + if ( driver != null ) { + pm = new DuccProcessMap(); + driver.setProcessMap(pm); // seems NOT to get set when driver is reconstituted + ed = v.getEdges(Direction.OUT, DbEdge.JdProcess.pname()); + for ( Edge e : ed ) { + OrientEdge oe = (OrientEdge) e; + OrientVertex ov = oe.getVertex(Direction.IN); + + ODocument pd = ov.getRecord(); + String pjson = pd.toJSON(); + + IDuccProcess pe = g.fromJson(pjson, DuccProcess.class); + pm.addProcess(pe); + } + } + + // Now must hack around becase this 'n that and JSON can't work out some things + String ct = (String) ((ADuccWork)j).getCompletionTypeObject(); + j.setCompletionType(JobCompletionType.valueOf(ct)); + + String so = (String) ((ADuccWork)j).getStateObject(); + j.setJobState(JobState.valueOf(so)); + + return j; + } + + /** + * Part of history management, recover ths indicated job from history. + */ + public IDuccWorkJob restoreJob(long friendly_id) + throws Exception + { + DuccWorkJob ret = null; + DbHandle h = null; + try { + h = dbManager.open(); + Iterable<Vertex> q = h.select("SELECT * FROM " + DbVertex.Job.pname() + " WHERE ducc_dbid=" + friendly_id + + " AND " + DbConstants.DUCC_DBCAT + "='" + DbCategory.History.pname()); + for ( Vertex v : q ) { + // There's only 1 unless db is broken. + return restoreJobInternal(h, (OrientVertex) v); + } + } finally { + h.close(); + } + + return ret; + } + + /** + * Part of history management, recover ths indicated jobs from history. + */ + public ArrayList<IDuccWorkJob> restoreJobs(long max) + throws Exception + { + ArrayList<IDuccWorkJob> ret = new ArrayList<IDuccWorkJob>(); + DbHandle h = null; + try { + h = dbManager.open(); + Iterable<Vertex> q = h.select("SELECT * FROM " + DbVertex.Job.pname() + + " where " + DbConstants.DUCC_DBCAT +"='" + DbCategory.History.pname() + + "' ORDER BY ducc_dbid DESC LIMIT "+ max); + for ( Vertex v : q ) { + IDuccWorkJob j = restoreJobInternal(h, (OrientVertex) v); + ret.add(j); + } + } finally { + h.close(); + } + + return ret; + } + // End of jobs section + // ---------------------------------------------------------------------------------------------------- + + + // ---------------------------------------------------------------------------------------------------- + // Reservations section + + private void saveReservationNoCommit(DbHandle h, IDuccWorkReservation r, DbCategory dbcat) + throws Exception + { + String methodName = "saveReservationNoCommit"; + + List<JdReservationBean> l = r.getJdReservationBeanList(); + if ( l != null ) { + for (JdReservationBean b : l ) { + ConcurrentHashMap<DuccId, SizeBytes> map = b.getMap(); + for ( DuccId k : map.keySet() ) { + logger.info(methodName, null, "SAVE ===> " + k.getFriendly() + " " + k.getUnique() + " : " + map.get(k)); + } + } + } + + + long now = System.currentTimeMillis(); + + Long id = r.getDuccId().getFriendly(); + logger.info(methodName, r.getDuccId(), "Saving."); + + // Nuke the command lines + + IDuccReservationMap resmap = r.getReservationMap(); + r.setReservationMap(null); + + Gson g = mkGsonForJob(); + + String dbres = g.toJson(r); + logger.info(methodName, null, "------------------- Reservation JSON: " + dbres); + + // Must repair these things because OR continues to use the job after it has been + // written to history. + r.setReservationMap(resmap); + + Object savedRes = h.saveObject(DbVertex.Reservation, id, dbres, dbcat); + + List<Object> savedHosts = new ArrayList<Object>(); + for (DuccId did : resmap.keySet()) { + Long pid = did.getFriendly(); + + IDuccReservation p = resmap.get(did); + String proc = g.toJson(p); + + savedHosts.add(h.saveObject(DbVertex.Process, pid, proc, dbcat)); + // logger.info(methodName, p.getDuccId(), "2 ----------> Time to save process", System.currentTimeMillis() - nowP); + + } + + h.addEdges(savedRes, savedHosts, DbEdge.JpProcess); + logger.info(methodName, r.getDuccId(), "----------> Total reservation save time:", System.currentTimeMillis() - now, "nPE", resmap.size()); + + } + + private void saveReservationInternal(IDuccWorkReservation r, boolean safe, DbCategory dbcat) + throws Exception + { + String methodName = "saveReservation"; + + Long id = r.getDuccId().getFriendly(); + DbHandle h = null; + try { + h = dbManager.open(); + if ( safe && h.thingInDatabase(id, DbVertex.Reservation, dbcat) ) { + h.close(); + return; + } + } catch ( Exception e ) { + logger.warn(methodName, r.getDuccId(), e); + h.close(); + return; + } + + try { + saveReservationNoCommit(h, r, dbcat); + } catch ( Exception e ) { + h.rollback(); + logger.error(methodName, r.getDuccId(), "Cannot store reservation:", e); + } finally { + h.commit(); + h.close(); + } + + } + + // Save to history only + public void saveReservation(IDuccWorkReservation r) + throws Exception + { + saveReservationInternal(r, true, DbCategory.History); + } + + public void saveReservationUnsafe(IDuccWorkReservation r) + throws Exception + { + saveReservationInternal(r, false, DbCategory.History); + } + + private IDuccWorkReservation restoreReservationInternal(DbHandle h, OrientVertex v) + throws Exception + { + String methodName = "restoreReservationInternal"; + IDuccWorkReservation r = null; + + ODocument d = v.getRecord(); + String json = d.toJSON(); + JsonObject jo = mkJsonObject(json); + + Gson g = mkGsonForJob(); + logger.info(methodName, null, g.toJson(jo)); + + r = g.fromJson(jo, DuccWorkReservation.class); + + List<JdReservationBean> l = r.getJdReservationBeanList(); + if ( l != null ) { + for (JdReservationBean b : l ) { + ConcurrentHashMap<DuccId, SizeBytes> map = b.getMap(); + for ( DuccId k : map.keySet() ) { + logger.info(methodName, null, "REST ===> " + k.getFriendly() + " " + k.getUnique() + " : " + map.get(k)); + } + } + } + + IDuccReservationMap rm = r.getReservationMap(); // seems to get set by default when job is recovered + Iterable<Edge> ed = v.getEdges(Direction.OUT, DbEdge.JpProcess.pname()); + for ( Edge e : ed ) { + OrientEdge oe = (OrientEdge) e; + OrientVertex ov = oe.getVertex(Direction.IN); + + ODocument pd = ov.getRecord(); + String pjson = pd.toJSON(); + + IDuccReservation rr = g.fromJson(pjson, DuccReservation.class); + rm.addReservation(rr); + } + + // Now must hack around becase this 'n that and JSON can't work out some things + String ct = (String) ((ADuccWork)r).getCompletionTypeObject(); + r.setCompletionType(ReservationCompletionType.valueOf(ct)); + + String so = (String) ((ADuccWork)r).getStateObject(); + r.setReservationState(ReservationState.valueOf(so)); + + return r; + } + + /** + * Part of history management, recover ths indicated reservation from history. + */ + public IDuccWorkReservation restoreReservation(long duccid) + throws Exception + { + DuccWorkReservation ret = null; + DbHandle h = null; + try { + h = dbManager.open(); + Iterable<Vertex> q = h.select("SELECT * FROM " + DbVertex.Reservation.pname() + " WHERE ducc_dbid=" + duccid + + " AND " + DbConstants.DUCC_DBCAT +"='" + DbCategory.History.pname() + "'"); + for ( Vertex v : q ) { + // There's only 1 unless db is broken. + return restoreReservationInternal(h, (OrientVertex) v); + } + } finally { + h.close(); + } + + return ret; + } + + /** + * Part of history management, recover ths indicated reservations from history. + */ + public ArrayList<IDuccWorkReservation> restoreReservations(long max) + throws Exception + { + ArrayList<IDuccWorkReservation> ret = new ArrayList<IDuccWorkReservation>(); + DbHandle h = null; + try { + h = dbManager.open(); + Iterable<Vertex> q = h.select("SELECT * FROM " + DbVertex.Reservation.pname() + + " WHERE " + DbConstants.DUCC_DBCAT + "='" + DbCategory.History.pname() + "'" + + " ORDER BY ducc_dbid DESC LIMIT "+ max); + for ( Vertex v : q ) { + IDuccWorkReservation j = restoreReservationInternal(h, (OrientVertex) v); + ret.add(j); + } + } finally { + h.close(); + } + + return ret; + } + // End of reservations section + // ---------------------------------------------------------------------------------------------------- + + + // ---------------------------------------------------------------------------------------------------- + // Services section + // public void serviceSave(IDuccWorkService s) + // throws Exception + // { + // } + + public void saveService(IDuccWorkService s) + throws Exception + { + saveJobInternal((IDuccWorkJob)s, DbVertex.ServiceInstance, true, DbCategory.History); + } + + public void saveServiceUnsafe(IDuccWorkService s) + throws Exception + { + saveJobInternal((IDuccWorkJob)s, DbVertex.ServiceInstance, false, DbCategory.History); + } + + + /** + * Part of history management, recover ths indicated service instance from history. + */ + public IDuccWorkService restoreService(long duccid) + throws Exception + { + DbHandle h = null; + try { + h = dbManager.open(); + Iterable<Vertex> q = h.select("SELECT * FROM " + DbVertex.ServiceInstance.pname() + " WHERE ducc_dbid=" + duccid + + " AND " + DbConstants.DUCC_DBCAT + "='" + DbCategory.History.pname() + "'"); + for ( Vertex v : q ) { + return restoreJobInternal(h, (OrientVertex) v); + } + } finally { + h.close(); + } + + return null; + } + + /** + * Part of history management, recover ths indicated service instances from history. + */ + public ArrayList<IDuccWorkService> restoreServices(long max) + throws Exception + { + ArrayList<IDuccWorkService> ret = new ArrayList<IDuccWorkService>(); + DbHandle h = null; + try { + h = dbManager.open(); + Iterable<Vertex> q = h.select("SELECT * FROM " + DbVertex.ServiceInstance.pname() + + " WHERE " + DbConstants.DUCC_DBCAT +"='" + DbCategory.History.pname() + "'" + + " ORDER BY ducc_dbid DESC LIMIT "+ max); + for ( Vertex v : q ) { + IDuccWorkService j = restoreJobInternal(h, (OrientVertex) v); + ret.add(j); + } + } finally { + h.close(); + } + + return ret; + + } + // End of services section + // ---------------------------------------------------------------------------------------------------- + + // ---------------------------------------------------------------------------------------------------- + // Orchecstrator Checkpoint save and restore. We save as discrete objects (unlike file-based checkpoint) + // so they can be included in queries. + + /** + * Orchestrator checkpoint, save the live orchestrator state to the database. + * + * @param work A map of all 'live' work, Jobs, Reservations, APs, Service instances + * @param processToJob Maps each specific process to the controlling work (job, reservation, ap, service instance) + * + * @TODO Do we even need processToJob? Can't it be derived from the work map? This question needs to be + * resolved by the owner of the OR. For now we just do it. + */ + public boolean checkpoint(DuccWorkMap work, Map<DuccId, DuccId> processToJob) + throws Exception + { + String methodName = "checkpoint"; + boolean ret = true; + + DbHandle h = null; + try { + h = dbManager.open(); + } catch ( Exception e ) { + logger.warn(methodName, null, "Cannot open database.", e); + if ( h != null ) h.close(); + return false; + } + + // We transactionally delete the old checkpoint, and then save the new one. If something gows wrong we + // rollback and thus don't lose stuff. In theory. + try { + h.execute("DELETE VERTEX V where " + DbConstants.DUCC_DBCAT + "='" + DbCategory.Checkpoint.pname() + "'"); + Map<DuccId, IDuccWork> map = work.getMap(); + for (IDuccWork w : map.values() ) { + switch(w.getDuccType()) { + case Job: + saveJobNoCommit(h, (IDuccWorkJob) w, DbVertex.Job, DbCategory.Checkpoint); + break; + case Service: + saveJobNoCommit(h, (IDuccWorkJob) w, DbVertex.ServiceInstance, DbCategory.Checkpoint); + break; + case Reservation: + saveReservationNoCommit(h, (IDuccWorkReservation) w, DbCategory.Checkpoint); + break; + default: + break; + } + } + + Gson g = mkGsonForJob(); + ProcessToJobList l = new ProcessToJobList(processToJob); + String json = g.toJson(l, l.getClass()); + // logger.info(methodName, null, "ProcessToJob:", json); + h.saveObject(DbVertex.ProcessToJob, null, json, DbCategory.Checkpoint); + h.commit(); + } catch ( Exception e ) { + if ( h != null ) h.rollback(); + logger.error(methodName, null, "Cannot save ProcessToJob map", e); + ret = false; + } finally { + if ( h != null ) h.close(); + if ( ret ) logger.info(methodName, null, "Saved Orchestrator Checkpoint"); + } + return ret; + } + + /** + * Orchestrator checkpoint. Restore the checkpoint from the DB. Caller must initialize + * empty maps, which we fill in. + */ + public boolean restore(DuccWorkMap work, Map<DuccId, DuccId> processToJob) + throws Exception + { + String methodName = "restore"; + DbHandle h = null; + boolean ret = true; + try { + h = dbManager.open(); + // Select all the "top level" objects ith DUCC_LIVE=true. When they get restored the + // attached object will get collected. + + Iterable<Vertex> q = h.select("SELECT * FROM V WHERE (" + + "@CLASS ='" + DbVertex.Job.pname() + + "' OR " + + "@CLASS ='" + DbVertex.Reservation.pname() + + "' OR " + + "@CLASS ='" + DbVertex.ServiceInstance.pname() + + "') AND " + DbConstants.DUCC_DBCAT + "='" + DbCategory.Checkpoint.pname() + "'"); + + IDuccWork w = null; + for ( Vertex v : q ) { + String l = ((OrientVertex) v).getLabel(); + if ( l.equals(DbVertex.Job.pname()) || l.equals(DbVertex.ServiceInstance.pname()) ) { + w = restoreJobInternal(h, (OrientVertex) v); + } else if ( l.equals(DbVertex.Reservation.pname()) ) { + w = restoreReservationInternal(h, (OrientVertex) v); + } else { + logger.warn(methodName, null, "Unexpected record of type", l, "in checkpoint restore."); + } + + work.addDuccWork(w); + } + + q = h.select("SELECT FROM " + DbVertex.ProcessToJob.pname()); + + int count = 0; + for ( Vertex vv : q ) { + if ( count > 1 ) { + logger.error(methodName, null, "Oops - we have multiple ProcessToJob records. Using the first one but it may be wrong."); + break; + } + + OrientVertex v = (OrientVertex) vv; + ODocument d = v.getRecord(); + String json = d.toJSON(); + logger.info(methodName, null, json); + + Gson g = mkGsonForJob(); + + ProcessToJobList l = g.fromJson(json, ProcessToJobList.class); + l.fill(processToJob); + } + + } catch ( Exception e ) { + logger.error(methodName, null, "Error restoring checkpoint:", e); + ret = false; + } finally { + if ( h != null ) h.close(); + } + + + return ret; + } + + // End of OR checkpoint save and restore + // ---------------------------------------------------------------------------------------------------- + + // ---------------------------------------------------------------------------------------------------- + // Stuff common to everything + JsonObject mkJsonObject(String json) + { + // This method lets us munge the json before using it, if we need to + JsonParser parser = new JsonParser(); + JsonObject jobj = parser.parse(json).getAsJsonObject(); + + return jobj; + } + + public void shutdown() + { + dbManager.shutdown(); + } + + // End of common + // ---------------------------------------------------------------------------------------------------- + + + // ---------------------------------------------------------------------------------------------------- + // Instance creators and adaptors for GSON + // ---------------------------------------------------------------------------------------------------- + + // We need these for the DuccNode and NodeIdentity because they don't have no-arg + // Constructors. + // + // @TODO after merge, consult with Jerry about adding in those constructors + private class NodeInstanceCreator implements InstanceCreator<Node> { + public Node createInstance(Type type) { + // System.out.println("DuccNode"); + return new DuccNode(null, null, false); + } + } + + private class NodeIdentityCreator implements InstanceCreator<NodeIdentity> { + public NodeIdentity createInstance(Type type) { + // System.out.println("DuccNodeIdentity"); + try { return new NodeIdentity(null, null); } catch ( Exception e ) {} + return null; + } + } + + /** + * JSON helper for our complex objects. Gson doesn't save type information in the json so + * it doesn't know how to construct things declared as interfaces. + * + * This class is a Gson adapter that saves the actual object type in the json on serialization, + * and uses that information on deserialization to construct the right thing. + */ + private class GenericInterfaceAdapter + implements + JsonSerializer<Object>, + JsonDeserializer<Object> + { + + private static final String DUCC_META_CLASS = "DUCC_META_CLASS"; + + @Override + public Object deserialize(JsonElement jsonElement, + Type type, + JsonDeserializationContext jsonDeserializationContext) + throws JsonParseException + { + // Reconstitute the "right" class based on the actual class it came from as + // found in metadata + JsonObject obj = jsonElement.getAsJsonObject(); + JsonElement clElem= obj.get(DUCC_META_CLASS); + + if ( clElem== null ) { + throw new IllegalStateException("Cannot determine concrete class for " + type + ". Must register explicit type adapter for it."); + } + String clName = clElem.getAsString(); + + //System.out.println("----- elem: " + clName + " clElem: " + obj); + try { + Class<?> clz = Class.forName(clName); + return jsonDeserializationContext.deserialize(jsonElement, clz); + } catch (ClassNotFoundException e) { + throw new JsonParseException(e); + } + } + + @Override + public JsonElement serialize(Object object, + Type type, + JsonSerializationContext jsonSerializationContext) + { + // Add the mete element indicating what kind of concrete class is this came from + //String n = object.getClass().getCanonicalName(); + //System.out.println("**** Serialize object A " + n + " of type " + type); + //if ( n.contains("Concurrent") ) { + // int stop = 1; + // stop++; + //} + + JsonElement ele = jsonSerializationContext.serialize(object, object.getClass()); + //System.out.println("**** Serialize object B " + object.getClass().getCanonicalName() + " of type " + type + " : ele " + ele); + ele.getAsJsonObject().addProperty(DUCC_META_CLASS, object.getClass().getCanonicalName()); + return ele; + } + } + + @SuppressWarnings("unused") + private class DuccTypeFactory + implements TypeAdapterFactory + { + + public <T> TypeAdapter<T> create(Gson gson, TypeToken<T> typeToken) + { + //System.out.println("TYPETOKEN: " + typeToken + " raw type: " + typeToken.getRawType().getName()); + Class<?> cl = typeToken.getRawType(); + //System.out.println(" Canonical name: " + cl.getCanonicalName()); + Type type = typeToken.getType(); + if ( typeToken.getRawType() != ConcurrentHashMap.class ) { + //System.out.println("Skipping type " + typeToken); + return null; + } + + if ( type instanceof ParameterizedType ) { + + ParameterizedType pt = (ParameterizedType) type; + Type[] types = pt.getActualTypeArguments(); + //for ( Type tt : types ) { + // System.out.println(" TYPE ARGUMENTS: " + tt); + //} + Type tt = types[0]; + Class<?> cll = (Class<?>) tt; + + } + return null; + } + + } + + @SuppressWarnings("unused") + private class MapAdaptor + extends TypeAdapter<ConcurrentHashMap<DuccId, Long>> + { + + public void write(JsonWriter out, ConcurrentHashMap<DuccId, Long> map) throws IOException { + System.out.println("***************** Writing"); + if (map == null) { + out.nullValue(); + return; + } + + out.beginArray(); + for (DuccId k : map.keySet() ) { + out.beginObject(); + out.value(k.getFriendly()); + out.value(k.getUnique()); + out.value(map.get(k)); + out.endObject(); + } + out.endArray(); + } + + public ConcurrentHashMap<DuccId, Long> read(JsonReader in) throws IOException { + System.out.println("***************** reading"); + if (in.peek() == JsonToken.NULL) { + in.nextNull(); + return null; + } + + ConcurrentHashMap<DuccId, Long> ret = new ConcurrentHashMap<DuccId, Long>(); + in.beginArray(); + while (in.hasNext()) { + in.beginObject(); + Long friendly = in.nextLong(); + String unique = in.nextString(); + + Long val = in.nextLong(); + in.endObject(); + DuccId id = new DuccId(friendly); + id.setUUID(UUID.fromString(unique)); + ret.put(id, val); + } + in.endArray(); + return ret; + } + } + +} Added: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/ProcessToJobList.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/ProcessToJobList.java?rev=1703203&view=auto ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/ProcessToJobList.java (added) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/ProcessToJobList.java Tue Sep 15 14:31:24 2015 @@ -0,0 +1,41 @@ +package org.apache.uima.ducc.database; + +import java.util.ArrayList; +import java.util.Map; + +import org.apache.uima.ducc.common.utils.id.DuccId; + + +/** + * This is a helper to serialize the ProcessToJob map in the OR checkpoint. We need this because + * the key for that map is complex (a DuccId) and it can't be propertly serialized into a JSON + * dictionary key. + * + */ +class ProcessToJobList +{ + ArrayList<PjPair> l = new ArrayList<PjPair>(); + + ProcessToJobList() {}; + ProcessToJobList(Map<DuccId, DuccId> m) + { + for ( DuccId k : m.keySet() ) { + l.add(new PjPair(k, m.get(k))); + } + } + + void fill(Map<DuccId, DuccId> ptj) + { + for ( PjPair p : l ) ptj.put(p.k, p.v); + } + + static private class PjPair + { + DuccId k; + DuccId v; + + PjPair(DuccId k, DuccId v) { this.k = k; this.v = v; } + + } + +} Added: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/StateServicesDb.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/StateServicesDb.java?rev=1703203&view=auto ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/StateServicesDb.java (added) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/StateServicesDb.java Tue Sep 15 14:31:24 2015 @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. +*/ +package org.apache.uima.ducc.database; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.uima.ducc.common.persistence.services.IStateServices; +import org.apache.uima.ducc.common.persistence.services.StateServicesDirectory; +import org.apache.uima.ducc.common.persistence.services.StateServicesSet; +import org.apache.uima.ducc.common.utils.DuccLogger; +import org.apache.uima.ducc.common.utils.DuccProperties; +import org.apache.uima.ducc.common.utils.id.DuccId; +import org.apache.uima.ducc.database.DbConstants.DbCategory; +import org.apache.uima.ducc.database.DbConstants.DbVertex; + +public class StateServicesDb + implements IStateServices +{ + private DuccLogger logger = null; + private DbManager dbManager; + + public StateServicesDb() + { + } + + private boolean init(String dburl) + throws Exception + { + // log4j issue - the caller must pass in a logger configured correctly for the component + // to which it belongs or it won't get sent to the right appender. + + boolean ret = false; + try { + dbManager = new DbManager(dburl, logger); + dbManager.init(); + ret = true; + } catch ( Exception e ) { + // logger.error("StateServicesDb", null, "Cannot open the service database:", e); + throw e; + } + return ret; + } + + public boolean init(DuccLogger logger) + throws Exception + { + this.logger = logger; + String stateUrl = System.getProperty("ducc.state.database.url"); + return init(stateUrl); + } + + /** + * Helper for restoring service registrations. This looks in the non-history part of the DB. + */ + private List<Long> getListOfType(DbVertex type) + { + String methodName = "getSvcList"; + + if ( dbManager == null ) { + logger.error(methodName, null, "Service database is not initialized."); + return new ArrayList<Long>(); // avoid NPE in caller + } + + List<Long> ret = null; + DbHandle h = null; + try { + h = dbManager.open(); + ret = h.listObjectsOfType(type, DbCategory.SmReg); + } catch (Throwable e) { + logger.error(methodName, null, e); + } finally { + if ( h != null ) h.close(); + } + + return ret; + } + + /** + * Return a list of service property file names. Must query the db every time. + */ + public List<Long> getSvcList() + { + return getListOfType(DbVertex.ServiceReg); + } + + /** + * Return a list of sersvice meta file names. Must query the db every time. + */ + public List<Long> getMetaList() + { + return getListOfType(DbVertex.ServiceMeta); + } + + /** + * This is adapted from the file-based version and as such, perhaps should be named better. + * + * This reads the entire (live, non-history) service registration into an object called + * StateServicesDirectory. This in turn contains a map of StateServicesSet objects. Each + * StateServiceSet contains two properties files, one for the submitted properties, and one + * for the service meta properties (SM state). + */ + public StateServicesDirectory getStateServicesDirectory() + throws Exception + { + String methodName = "getStateServicesDirectory"; + StateServicesDirectory ret = new StateServicesDirectory(); + + if ( dbManager== null ) { + logger.error(methodName, null, "Service database is not initialized."); + return ret; // avoid NPE in caller + } + + DbHandle h = dbManager.open(); + try { + Map<Long, Properties> svcset = h.getPropertiesForType(DbVertex.ServiceReg , DbCategory.SmReg); + Map<Long, Properties> metaset = h.getPropertiesForType(DbVertex.ServiceMeta, DbCategory.SmReg); + + for ( Long k : svcset.keySet() ) { + logger.trace(methodName, null, "Handling key", k); + DuccProperties sp = new DuccProperties(svcset.get(k)); + DuccProperties mp = new DuccProperties(metaset.get(k)); + StateServicesSet sss = new StateServicesSet(); + sss.put(svc, sp); + sss.put(meta, mp); + + ret.put(k, sss); + } + } finally { + if ( h != null ) h.close(); + } + + return ret; + } + + /** + * Save the src and meta propeties into the non-history part of the DB. + * + * @param serviceID The SM-assigned duccid for the service registration. + * @param svc_props The "user-submitted" properties set defining the service. + * @param meta-props The SM-generated properties contain service state + * @param safe This is for the loader. If 'true', then don't do anything if + * there is already something in the DB for the service. If 'false + * just blindly put it into the DB. + * @NOTE The OrientDb SQL has a create-or-modify primitive. Is there something + * equivalent in the Java interface? If so, we should modify this to use it + * and can then eliminate the 'safe' flag. + */ + boolean storePropertiesInternal (DuccId serviceId, Properties svc_props, Properties meta_props, boolean safe) + { + String methodName = "storePropertiesInternal"; + DbHandle h = null; + + try { + h = dbManager.open(); + + Long id = serviceId.getFriendly(); + if ( safe ) { + if ( h.thingInDatabase(id, DbVertex.ServiceReg, DbCategory.SmReg) ) { + return false; + } + } + + h.createPropertiesObject(svc_props, DbVertex.ServiceReg, id, DbCategory.SmReg); + h.createPropertiesObject(meta_props, DbVertex.ServiceMeta, id, DbCategory.SmReg); + h.commit(); + return true; + } catch ( Exception e ) { + logger.error(methodName, serviceId, "ROLLBACK: ", e); + if ( h != null ) h.rollback(); + return false; + } finally { + if ( h != null ) h.close(); + } + } + + /** + * Save the props into the database, don't check to see if they're there already. Used by the + * loader for converting old registries to the db. + */ + public boolean storePropertiesUnsafe(DuccId serviceId, Properties svc_props, Properties meta_props) + { + return storePropertiesInternal(serviceId, svc_props, meta_props, false); + } + + /** + * Save the props into the db. If the object exists don't overwrite it, and return an error. + * The only non-error return is if the object doesn't already exist, and it is safely committed. + * + * This is used by the SM on initial service registration only. + */ + public boolean storeProperties(DuccId serviceId, Properties svc_props, Properties meta_props) + { + return storePropertiesInternal(serviceId, svc_props, meta_props, true); + } + + + /** + * The registration is removed, move it to the history part of the DB. + */ + public void moveToHistory(DuccId serviceId, Properties job_props, Properties meta_props) + { + // All we need to do is re-sync the final properties, and be sure to set DUCC_HISTORY to false + String methodName = "moveToHistory"; + Long id = serviceId.getFriendly(); + DbHandle h = null; + try { + + h = dbManager.open(); // get new connection from the pool + h.syncProperties(job_props, DbVertex.ServiceReg, id, DbCategory.History); + h.syncProperties(meta_props, DbVertex.ServiceMeta, id, DbCategory.History); + h.commit(); + } catch ( Exception e ) { + logger.error(methodName, serviceId, "ROLLBACK: ", e); + if ( h != null ) h.rollback(); + } finally { + if ( h != null ) h.close(); + } + } + + /** + * Helper method to Update the indicated properties file, in the non-history part of the db. + * This is most often called by SM to update the service meta after state changes. + * + * @param serviceId The SM-assigned DUCC ID for the service registration. + * @param props The properties file to save. Usually it's just the meta but if + * the service is being modified, it could also be the registration. + * @param type The type enum, ususally Service or ServiceMeta. + */ + private boolean updateProperties(DuccId serviceId, Properties props, DbVertex type) + { + String methodName = "updatePropeties"; + DbHandle h = null; + try { + h = dbManager.open(); + h.syncProperties(props, type, serviceId.getFriendly(), DbCategory.SmReg); + h.commit(); + return true; + } catch ( Exception e ) { + logger.error(methodName, serviceId, "ROLLBACK:", e); + if ( h != null ) h.rollback(); + return false; + } finally { + if ( h != null ) h.close(); + } + } + + /** + * Update the service registration. + */ + public boolean updateJobProperties(DuccId serviceId, Properties props) + { + return updateProperties(serviceId, props, DbVertex.ServiceReg); + } + + /** + * Update the service meta data. + */ + public boolean updateMetaProperties(DuccId serviceId, Properties props) + { + return updateProperties(serviceId, props, DbVertex.ServiceMeta); + } + + /** + * Close and discard the database connection pool. + */ + public void shutdown() + { + dbManager.shutdown(); + } +} Added: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/misc/DbQuery.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/misc/DbQuery.java?rev=1703203&view=auto ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/misc/DbQuery.java (added) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/misc/DbQuery.java Tue Sep 15 14:31:24 2015 @@ -0,0 +1,74 @@ +package org.apache.uima.ducc.database.misc; + +import org.apache.uima.ducc.common.utils.DuccLogger; +import org.apache.uima.ducc.database.DbHandle; +import org.apache.uima.ducc.database.DbManager; + +import com.orientechnologies.orient.core.record.impl.ODocument; +import com.tinkerpop.blueprints.Vertex; +import com.tinkerpop.blueprints.impls.orient.OrientVertex; + +/** + * Toy orientdb loader to load a historydb from ducc history + */ + +public class DbQuery +{ + DuccLogger logger = DuccLogger.getLogger(DbQuery.class, "JRC"); + + DbManager dbManager; + String dburl = "remote:localhost/ToyHistory"; + // String dburl = "plocal:/users/challngr/DuccHistory/ToyHistory"; + + String jobHistory = System.getProperty("user.home") + "/ducc_runtime/history/jobs"; + // String jobHistory = "/home/ducc/ducc_runtime/history/jobs"; + + public DbQuery() + throws Exception + { + try { + dbManager = new DbManager(dburl, logger); + dbManager.init(); + } catch ( Exception e ) { + logger.error("HisstoryManagerDb", null, "Cannot open the history database:", e); + } + + } + + public void closeDb() + { + dbManager.shutdown(); + } + + + public void run() + throws Exception + { + String methodName = "run"; + + long now = System.currentTimeMillis(); + DbHandle h = dbManager.open(); + Iterable<Vertex> q = h.select("SELECT ducc_dbid FROM VJOB"); + logger.info(methodName, null, "TIme to execute query:", System.currentTimeMillis() - now); + for ( Vertex v : q ) { + ODocument d = ((OrientVertex)v).getRecord(); + Long did = d.field("ducc_dbid"); + logger.info(methodName, null, "ID:", did); + } + } + + + public static void main(String[] args) + { + DbQuery dbl = null; + try { + dbl = new DbQuery(); + dbl.run(); + } catch ( Exception e ) { + e.printStackTrace(); + } finally { + if ( dbl != null ) dbl.closeDb(); + } + } + +} Added: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/misc/Server.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/misc/Server.java?rev=1703203&view=auto ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/misc/Server.java (added) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/misc/Server.java Tue Sep 15 14:31:24 2015 @@ -0,0 +1,74 @@ +package org.apache.uima.ducc.database.misc; + +import java.io.File; +import java.io.FileInputStream; + +import com.orientechnologies.orient.server.OServer; +import com.orientechnologies.orient.server.OServerMain; + +public class Server +{ + String config = null; + OServer server = null; + public Server() + throws Exception + { + File f = new File("/home/challngr/ducc_runtime/resources/database.xml"); + int len = (int) f.length(); + byte[] buf = new byte[len]; + FileInputStream fis = new FileInputStream(f); + fis.read(buf, 0, len); + fis.close(); + config = new String(buf); + } + + public void run() + throws Exception + { + + server = OServerMain.create(); + server.startup( config ); + server.activate(); + + /** + "<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"yes\"?>" + + "<orient-server>" + + "<network>" + + "<protocols>" + + "<protocol name=\"binary\" implementation=\"com.orientechnologies.orient.server.network.protocol.binary.ONetworkProtocolBinary\"/>" + + "<protocol name=\"http\" implementation=\"com.orientechnologies.orient.server.network.protocol.http.ONetworkProtocolHttpDb\"/>" + + "</protocols>" + + "<listeners>" + + "<listener ip-address=\"0.0.0.0\" port-range=\"2424-2430\" protocol=\"binary\"/>" + + "<listener ip-address=\"0.0.0.0\" port-range=\"2480-2490\" protocol=\"http\"/>" + + "</listeners>" + + "</network>" + + "<users>" + + "<user name=\"root\" password=\"asdfasdf\" resources=\"*\"/>" + + "</users>" + + "<properties>" + + "<entry name=\"orientdb.www.path\" value=\"C:/work/dev/orientechnologies/orientdb/releases/1.0rc1-SNAPSHOT/www/\"/>" + + "<entry name=\"orientdb.config.file\" value=\"C:/work/dev/orientechnologies/orientdb/releases/1.0rc1-SNAPSHOT/config/orientdb-server-config.xml\"/>" + + "<entry name=\"server.cache.staticResources\" value=\"false\"/>" + + "<entry name=\"log.console.level\" value=\"info\"/>" + + "<entry name=\"log.file.level\" value=\"fine\"/>" + + "<entry value=\"/home/challngr/ducc_runtime/database/DuccHistory\" name=\"server.database.path\" />" + //The following is required to eliminate an error or warning "Error on resolving property: ORIENTDB_HOME" + + "<entry name=\"plugin.dynamic\" value=\"false\"/>" + + "</properties>" + "</orient-server>"); + */ + + } + + public static void main(String[] args) + throws Exception + { + try { + Server s = new Server(); + s.run(); + } catch ( Exception e ) { + e.printStackTrace(); + } + } + +} Added: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/misc/Shutdown.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/misc/Shutdown.java?rev=1703203&view=auto ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/misc/Shutdown.java (added) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/misc/Shutdown.java Tue Sep 15 14:31:24 2015 @@ -0,0 +1,28 @@ +package org.apache.uima.ducc.database.misc; + + +public class Shutdown +{ + public Shutdown() + throws Exception + { + System.getProperties().put("orientdb.config.file", "/home/challngr/ducc_runtime/resources/database.xml"); + } + + public void run() + throws Exception + { + //OServerShutdownMain(); + } + + public static void main(String[] args) + throws Exception + { + try { + Shutdown s = new Shutdown(); + s.run(); + } catch ( Exception e ) { + e.printStackTrace(); + } + } +} Added: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/misc/SmLoader.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/misc/SmLoader.java?rev=1703203&view=auto ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/misc/SmLoader.java (added) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/misc/SmLoader.java Tue Sep 15 14:31:24 2015 @@ -0,0 +1,169 @@ +package org.apache.uima.ducc.database.misc; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Properties; + +import org.apache.uima.ducc.common.utils.DuccLogger; +import org.apache.uima.ducc.database.DbConstants.DbCategory; +import org.apache.uima.ducc.database.DbConstants.DbEdge; +import org.apache.uima.ducc.database.DbConstants.DbVertex; +import org.apache.uima.ducc.database.DbHandle; +import org.apache.uima.ducc.database.DbLoader; +import org.apache.uima.ducc.database.DbManager; + +/** + * Toy orientdb loader to load a service registry into OrientDb + */ + +public class SmLoader +{ + DuccLogger logger = DuccLogger.getLogger(DbLoader.class, "DB"); + + DbManager dbManager; + String dburl = "remote:localhost/DuccState"; + + //String jobHistory = System.getProperty("user.home") + "/ducc_runtime/history/jobs"; + + // Must sync these two! + String serviceState = "/home/ducc/ducc_runtime/state/services"; + DbCategory dbcat = DbCategory.SmReg; + + public SmLoader() + throws Exception + { + try { + dbManager = new DbManager(dburl, logger); + dbManager.init(); + } catch ( Exception e ) { + logger.error("SmLoader", null, "Cannot open the state database:", e); + } + + } + + public void closeDb() + { + dbManager.shutdown(); + } + + public void restoreService(long id) + { + String methodName = "restoreService"; + // DbHandle h = null; + // boolean showJson = false; + + logger.info(methodName, null, "Restoring Service", "Not implemented", id); + + } + + static int storeCount = 0; + public void saveService(long id, Properties svc_props, Properties meta_props) + throws IOException + { + String methodName = "saveService"; + long now = System.currentTimeMillis(); + + + DbHandle h = null; + try { + + String cps = (String) svc_props.remove("classpath"); + //String ping_cps = (String) svc_props.remove("service_ping_classpath"); + h = dbManager.open(); + Object svc = h.createPropertiesObject(svc_props, DbVertex.ServiceInstance, id, dbcat); + Object meta = h.createPropertiesObject(meta_props, DbVertex.ServiceMeta, id, dbcat); + h.addEdge(svc, meta, DbEdge.ServiceMeta); + + if ( cps != null ) { + Properties cp_props = new Properties(); + cp_props.put("classpath", cps); + Object cp = h.createPropertiesObject(cp_props, DbVertex.Classpath, id, dbcat); + h.addEdge(svc, cp, DbEdge.Classpath); + } + + h.commit(); + logger.info(methodName, null, id, "---------> Time to save Service:", System.currentTimeMillis() - now); + storeCount++; + } catch ( Exception e ) { + if ( h != null ) h.rollback(); + logger.error(methodName, null, id, "Cannot store service registration:", e); + } finally { + if ( h != null ) h.close(); + } + + } + + public void run() + { + String methodName = "run"; + int max = 0; + //ArrayList<Long> ids = new ArrayList<Long>(); + int c = 0; + File dir = new File(serviceState); + DbHandle h = null; + try { + for ( File f : dir.listFiles() ) { + String s = f.toString(); + if ( s.endsWith(".svc") ) { + logger.info(methodName, null, "Loading file", c++, ":", f); + //IDuccWorkJob job = null; + try { + Properties svc_props = new Properties(); + Properties meta_props = new Properties(); + + String[] dirparts = s.split("/"); + String[] parts = dirparts[dirparts.length-1].split("\\."); + long id = 0; + try { + id = Long.parseLong(parts[0]); + } catch (NumberFormatException ee ) { + logger.error(methodName, null, "File", f, "does not appear to be a valid registry file. Skipping."); + continue; + } + + FileInputStream svcin = new FileInputStream(f); + FileInputStream metain = new FileInputStream(s.replace("svc", "meta")); + + svc_props.load(svcin); + meta_props.load(metain); + svcin.close(); + metain.close(); + + saveService(id, svc_props, meta_props); + } catch(Exception e) { + logger.info(methodName, null, e); + } finally { + // restoreJob(job.getDuccId().getFriendly()); + if ( (max > 0) && (++c > max) ) break; + } + } + } + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } finally { + if ( h != null ) h.close(); + logger.info(methodName, null, "Stored", storeCount, "objects."); + } + // Now read them back + //for ( long l : ids ) { + // restoreJob(l); + // } + } + + + public static void main(String[] args) + { + SmLoader dbl = null; + try { + dbl = new SmLoader(); + dbl.run(); + } catch ( Exception e ) { + e.printStackTrace(); + } finally { + if ( dbl != null ) dbl.closeDb(); + } + } + +}