Author: challngr Date: Tue Dec 1 19:21:52 2015 New Revision: 1717503 URL: http://svn.apache.org/viewvc?rev=1717503&view=rev Log: UIMA-4577 Add RmLoad table and support.
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IDbJob.java Modified: uima/sandbox/uima-ducc/trunk/src/main/resources/cassandra.yaml uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IRmPersistence.java uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/NullRmStatePersistence.java uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmStatePersistence.java uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IRmJob.java uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ResourceClass.java uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/RmJob.java uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java Modified: uima/sandbox/uima-ducc/trunk/src/main/resources/cassandra.yaml URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/src/main/resources/cassandra.yaml?rev=1717503&r1=1717502&r2=1717503&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/src/main/resources/cassandra.yaml (original) +++ uima/sandbox/uima-ducc/trunk/src/main/resources/cassandra.yaml Tue Dec 1 19:21:52 2015 @@ -108,12 +108,12 @@ partitioner: org.apache.cassandra.dht.Mu # the configured compaction strategy. # If not set, the default directory is $CASSANDRA_HOME/data/data. data_file_directories: - - DUCC/data + - ../state/database/data # commit log. when running on magnetic HDD, this should be a # separate spindle than the data directories. # If not set, the default directory is $CASSANDRA_HOME/data/commitlog. -commitlog_directory: DUCC/commitlog +commitlog_directory: ../state/database/commitlog # policy for data disk failures: # die: shut down gossip and client transports and kill the JVM for any fs errors or @@ -228,7 +228,7 @@ counter_cache_save_period: 7200 # saved caches # If not set, the default directory is $CASSANDRA_HOME/data/saved_caches. -saved_caches_directory: DUCC/saved_caches +saved_caches_directory: ../state/database/saved_caches # commitlog_sync may be either "periodic" or "batch." # @@ -274,7 +274,7 @@ seed_provider: parameters: # seeds is actually a comma-delimited list of addresses. # Ex: "<ip1>,<ip2>,<ip3>" - - seeds: "DUCC_HEAD" + - seeds: "bluej538" # For workloads with more data than can fit in memory, Cassandra's # bottleneck will be reads that need to fetch data from @@ -387,7 +387,7 @@ ssl_storage_port: 7001 # you can specify which should be chosen using listen_interface_prefer_ipv6. If false the first ipv4 # address will be used. If true the first ipv6 address will be used. Defaults to false preferring # ipv4. If there is only one address it will be selected regardless of ipv4/ipv6. -listen_address: DUCC_HEAD +listen_address: bluej538 # listen_interface: eth0 # listen_interface_prefer_ipv6: false @@ -445,7 +445,7 @@ start_rpc: true # you can specify which should be chosen using rpc_interface_prefer_ipv6. If false the first ipv4 # address will be used. If true the first ipv6 address will be used. Defaults to false preferring # ipv4. If there is only one address it will be selected regardless of ipv4/ipv6. -rpc_address: DUCC_HEAD +rpc_address: bluej538 # rpc_interface: eth1 # rpc_interface_prefer_ipv6: false Added: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IDbJob.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IDbJob.java?rev=1717503&view=auto ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IDbJob.java (added) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IDbJob.java Tue Dec 1 19:21:52 2015 @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. +*/ + + +package org.apache.uima.ducc.common.persistence.rm; + + +public interface IDbJob +{ + + public String getClassName(); + public long getFriendlyId(); + public String getUserName(); + public int getMemory(); + public String getShortType(); + public int queryDemand(); + public int countOccupancy(); + public String getState(); + public int getShareOrder(); +} Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IRmPersistence.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IRmPersistence.java?rev=1717503&r1=1717502&r2=1717503&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IRmPersistence.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IRmPersistence.java Tue Dec 1 19:21:52 2015 @@ -177,6 +177,21 @@ public interface IRmPersistence public Map<String, Map<String, Object>> getAllMachines() throws Exception; /** + * A new job arrives (or is recovered after restart). + */ + public void addJob(IDbJob j) throws Exception; + + /** + * A job has left the system forever. + */ + public void deleteJob(IDbJob j) throws Exception; + + /** + * How many shares to I want from the scheduler? + */ + public void updateDemand(IDbJob j) throws Exception; + + /** * Shutdown the connection to the DB; * */ @@ -347,6 +362,67 @@ public interface IRmPersistence } + /** + * This table lists jobs in the system. + */ + enum RmLoad + implements IDbProperty + { + TABLE_NAME { + public String pname() { return "rmload"; } + public Type type() { return Type.String; } + public boolean isPrivate() { return true;} + public boolean isMeta() { return true;} + }, + + Class { + public String pname() { return "class"; } + public Type type() { return Type.String; } + }, + + JobId { + public String pname() { return "job_id"; } + public Type type() { return Type.Long; } + public boolean isPrimaryKey() { return true; } + }, + + User { + public String pname() { return "user"; } + public Type type() { return Type.String; } + }, + + Memory { + public String pname() { return "memory"; } + public Type type() { return Type.Integer; } + }, + + State { + public String pname() { return "state"; } + public Type type() { return Type.String; } + }, + + Demand { + public String pname() { return "demand"; } + public Type type() { return Type.Integer; } + }, + + Occupancy { + public String pname() { return "occupancy"; } + public Type type() { return Type.Integer; } + }, + + JobType { + public String pname() { return "jobtype"; } + public Type type() { return Type.String; } + }; + + public boolean isPrimaryKey() { return false; } + public boolean isPrivate() { return false; } + public boolean isMeta() { return false; } + public String columnName() { return pname(); } + public boolean isIndex() { return false; } + + } } Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/NullRmStatePersistence.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/NullRmStatePersistence.java?rev=1717503&r1=1717502&r2=1717503&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/NullRmStatePersistence.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/NullRmStatePersistence.java Tue Dec 1 19:21:52 2015 @@ -25,7 +25,9 @@ import java.util.Properties; import org.apache.uima.ducc.common.utils.DuccLogger; import org.apache.uima.ducc.common.utils.id.DuccId; - +/** + * This class allows a persistence object to be created even if none is configured, without crashes or NPEs. + */ public class NullRmStatePersistence implements IRmPersistence { @@ -46,4 +48,7 @@ public class NullRmStatePersistence impl public void updateShare(String node, DuccId shareid, DuccId jobid, long investment, String state, long init_time, long pid) {} public Properties getMachine(String id) { return null; } public Map<String, Map<String, Object>> getAllMachines() { return new HashMap<String, Map<String, Object>>(); } + public void addJob(IDbJob j ) {} + public void deleteJob(IDbJob j ) {} + public void updateDemand(IDbJob j) {} } Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java?rev=1717503&r1=1717502&r2=1717503&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java Tue Dec 1 19:21:52 2015 @@ -133,10 +133,10 @@ public class DbCreate return false; } Metadata metadata = cluster.getMetadata(); - logger.info(methodName, null, "Connected to cluster:", metadata.getClusterName()); + doLog(methodName, "Connected to cluster:", metadata.getClusterName()); for ( Host host : metadata.getAllHosts() ) { - logger.info(methodName, null, "Datatacenter:", host.getDatacenter(), "Host:", host.getAddress(), "Rack:", host.getRack()); + doLog(methodName, "Datatacenter:", host.getDatacenter(), "Host:", host.getAddress(), "Rack:", host.getRack()); } session = cluster.connect(); return true; Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmStatePersistence.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmStatePersistence.java?rev=1717503&r1=1717502&r2=1717503&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmStatePersistence.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmStatePersistence.java Tue Dec 1 19:21:52 2015 @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import org.apache.uima.ducc.common.persistence.rm.IDbJob; import org.apache.uima.ducc.common.persistence.rm.IDbShare; import org.apache.uima.ducc.common.persistence.rm.IRmPersistence; import org.apache.uima.ducc.common.utils.DuccLogger; @@ -48,7 +49,11 @@ public class RmStatePersistence DuccLogger logger = null; static final String RM_NODE_TABLE = RmNodes.TABLE_NAME.pname(); static final String RM_SHARE_TABLE = RmShares.TABLE_NAME.pname(); + static final String RM_LOAD_TABLE = RmLoad.TABLE_NAME.pname(); + // Prepared statements to manage the RmNodes table + + // Prepared statements to manage the RmShares table PreparedStatement shareAddPrepare = null; PreparedStatement shareDelPrepare = null; PreparedStatement updateFixedPrepare = null; @@ -56,6 +61,11 @@ public class RmStatePersistence PreparedStatement updateEvictedPrepare = null; PreparedStatement updateSharePrepare = null; + // Prepared statements to manage the RmLoad table + PreparedStatement addJobPrepare = null; + PreparedStatement deleteJobPrepare = null; + PreparedStatement updateDemandPrepare = null; + public RmStatePersistence() { } @@ -99,6 +109,11 @@ public class RmStatePersistence updatePurgedPrepare = h.prepare("UPDATE " + RM_SHARE_TABLE + " SET purged = ? WHERE node = ? AND ducc_dbid = ? and job_id = ?"); updateEvictedPrepare = h.prepare("UPDATE " + RM_SHARE_TABLE + " SET evicted = ? WHERE node = ? AND ducc_dbid = ? and job_id = ?"); updateSharePrepare = h.prepare("UPDATE " + RM_SHARE_TABLE + " SET investment = ?, state = ?, init_time = ?, pid = ? WHERE node = ? AND ducc_dbid = ? and job_id = ?"); + + // An upsert + addJobPrepare = h.prepare("UPDATE " + RM_LOAD_TABLE + " SET class = ?, user = ?, memory = ?, jobtype = ? WHERE job_id = ?"); + deleteJobPrepare = h.prepare("DELETE FROM " + RM_LOAD_TABLE + " WHERE job_id=?"); + updateDemandPrepare = h.prepare("UPDATE " + RM_LOAD_TABLE + " SET demand = ?, occupancy = ?, state = ? WHERE job_id=?"); } public void close() @@ -116,6 +131,7 @@ public class RmStatePersistence h = dbManager.open(); h.execute("TRUNCATE " + RM_NODE_TABLE); h.execute("TRUNCATE " + RM_SHARE_TABLE); + h.execute("TRUNCATE " + RM_LOAD_TABLE); } catch ( Exception e ) { logger.error(methodName, null, "Cannot clear the database.", e); } @@ -144,6 +160,15 @@ public class RmStatePersistence ret.add(new SimpleStatement(s)); } + buf = new StringBuffer("CREATE TABLE IF NOT EXISTS " + RM_LOAD_TABLE + " ("); + buf.append(DbUtil.mkSchema(RmLoad.values())); + buf.append(")"); + ret.add(new SimpleStatement(buf.toString())); + indexes = DbUtil.mkIndices(RmShares.values(), RM_SHARE_TABLE); + for ( String s : indexes ) { + ret.add(new SimpleStatement(s)); + } + return ret; } @@ -260,7 +285,30 @@ public class RmStatePersistence } return ret; } - + + public void addJob(IDbJob j) + throws Exception + { + DbHandle h = dbManager.open(); + h.execute(addJobPrepare, j.getClassName(), j.getUserName(), j.getMemory(), j.getShortType(), j.getFriendlyId()); + } + + public void deleteJob(IDbJob j) + throws Exception + { + DbHandle h = dbManager.open(); + h.execute(deleteJobPrepare, j.getFriendlyId()); + } + + public void updateDemand(IDbJob j) + throws Exception + { + DbHandle h = dbManager.open(); + // queryDemand returns the number of processes wanted by the job, of the job's memory size + // The occupancy is converted from qshares to nshares (processes) for the db. + h.execute(updateDemandPrepare, j.queryDemand(), (j.countOccupancy() / j.getShareOrder()), j.getState(), j.getFriendlyId()); + } + public static void main(String[] args) { } Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java?rev=1717503&r1=1717502&r2=1717503&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java Tue Dec 1 19:21:52 2015 @@ -951,6 +951,7 @@ public class JobManagerConverter localMap.addDuccWork(l); // still schedulable, and we already know about it, just sync the state + scheduler.signalState(l.getDuccId(), l.getStateObject().toString()); switch ( l.getDuccType() ) { case Job: jobUpdate(r.getStateObject(), l); Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IRmJob.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IRmJob.java?rev=1717503&r1=1717502&r2=1717503&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IRmJob.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IRmJob.java Tue Dec 1 19:21:52 2015 @@ -21,6 +21,7 @@ package org.apache.uima.ducc.rm.schedule import java.util.HashMap; import java.util.Map; +import org.apache.uima.ducc.common.persistence.rm.IDbJob; import org.apache.uima.ducc.common.utils.id.DuccId; import org.apache.uima.ducc.transport.event.common.IDuccTypes.DuccType; @@ -31,7 +32,8 @@ import org.apache.uima.ducc.transport.ev public interface IRmJob extends SchedConstants, - IEntity + IEntity, + IDbJob { /** @@ -41,9 +43,9 @@ public interface IRmJob public DuccId getId(); - public String getShortType(); // S, R, M, J - service reservation managed-reservation, job + // public String getShortType(); IDbJob UIMA-4577 // S, R, M, J - service reservation managed-reservation, job - public long getFriendlyId(); + // public long getFriendlyId(); UIMA 4577 public String getName(); public void setJobName(String name); @@ -74,7 +76,9 @@ public interface IRmJob public boolean isReservation(); // ask ... public boolean setInitWait(boolean w); // When set, job cap is set low, waiting for confirmation that init is ok. - // Returns the prev state. + // Returns the prev state + + public void setState(String state); // UIMA-4577 Information only, for the db. getState() is in IDbJob; /** * Used during scheduling cycle only, keep track of number of shares given out to this job. @@ -88,7 +92,7 @@ public interface IRmJob /** * For queries - how many processes do I want in a perfect world? */ - public int queryDemand(); + // public int queryDemand(); to IDbJob UIMA-4577 /** * Eviction policies, configurable. @@ -220,7 +224,7 @@ public interface IRmJob * Scheduler looks at job memory and decides what its share order is. */ public void setShareOrder(int s); - public int getShareOrder(); + public int getShareOrder(); // IDbJob UIMA-4577 /** * This returns the largest number that can actually be used, which will be either the @@ -230,7 +234,7 @@ public interface IRmJob public void initJobCap(); // calculate the cap at start of cycle and cache it // because it is frequently used - public String getUserName(); + // public String getUserName(); // UIMA 4577 IDbJob public void setUserName(String n); public User getUser(); @@ -242,7 +246,7 @@ public interface IRmJob public int getUserPriority(); public void setUserPriority(int p); - public String getClassName(); + // public String getClassName(); UIMA 4577 IDbJob public void setClassName(String n); public int getSchedulingPriority(); @@ -257,7 +261,7 @@ public interface IRmJob public int nThreads(); public void setThreads(int threads); - public int getMemory(); + // public int getMemory(); UIMA 4577 IDbJob public void setMemory(int memory); /** @@ -283,7 +287,7 @@ public interface IRmJob // Total number of shares to account to me - either actually assigned, or // counted afresh in the current scheduling cycle, for allotments - public int countOccupancy(); // UIMA-4275 + // public int countOccupancy(); // UIMA-4275 moved to IDbJob by UIMA-4577 // UIMA-4275 Must lose some number of shares unconditionally public void shrinkBy(int howmany); Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java?rev=1717503&r1=1717502&r2=1717503&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java Tue Dec 1 19:21:52 2015 @@ -50,6 +50,7 @@ public interface ISchedulerMain void signalCompletion(DuccId id); void signalInitialized(IRmJob id); void signalCompletion(IRmJob job, Share share); + void signalState(DuccId jobid, String state); //void signalGrowth(DuccId jobid, Share share); String getDefaultFairShareName(); Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ResourceClass.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ResourceClass.java?rev=1717503&r1=1717502&r2=1717503&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ResourceClass.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ResourceClass.java Tue Dec 1 19:21:52 2015 @@ -573,7 +573,7 @@ public class ResourceClass { int sum = 0; for ( IRmJob j : allJobs.values() ) { - sum += (j.countOccupancy() * j.getShareOrder()); // in quantum shares UIMA-4275 + sum += (j.countOccupancy()); // in quantum shares UIMA-4275 } return sum; } Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/RmJob.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/RmJob.java?rev=1717503&r1=1717502&r2=1717503&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/RmJob.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/RmJob.java Tue Dec 1 19:21:52 2015 @@ -45,6 +45,7 @@ public class RmJob protected DuccId id; // sched-assigned id (maybe delegate to job manager eventually) protected DuccType ducc_type; // for messages so we can tell what kind of job + protected String state = "New"; // UIMA-4577 info only, for the db protected boolean arbitrary_process = false; // Is this an AP? protected String name; // user's name for job protected String resource_class_name; // Name of the res class, from incoming job parms @@ -124,6 +125,7 @@ public class RmJob orchestrator_epoch = SystemPropertyResolver.getIntProperty("ducc.orchestrator.state.publish.rate", 10000); rm_rate = SystemPropertyResolver.getIntProperty("ducc.rm.state.publish.ratio", 4); ducc_epoch = orchestrator_epoch * rm_rate; + } // public RmJob(DuccId id, Properties properties) @@ -169,7 +171,10 @@ public class RmJob { this.name = name; } - + + public void setState(String state) { this.state = state; } + public String getState() { return this.state; } + public void setReservation() { this.is_reservation = true; Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java?rev=1717503&r1=1717502&r2=1717503&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java Tue Dec 1 19:21:52 2015 @@ -1051,6 +1051,11 @@ public class Scheduler prclass.addJob(j); j.setResourceClass(prclass); + try { + persistence.addJob(j); + } catch (Exception e) { + logger.warn(methodName, j.getId(), "Cannot persist new job in database:", e); + } logger.info(methodName, j.getId(), "submit", j.toString()); } @@ -1060,6 +1065,14 @@ public class Scheduler schedulers[i].schedule(upd); } + for ( IRmJob j : allJobs.values() ) { // UIMA-4577 persist 'demand' + try { + persistence.updateDemand(j); + } catch (Exception e) { + logger.warn(methodName, j.getId(), "Cannot update demand in database:", e); + } + } + logger.info(methodName, null, "--------------- Scheduler returns ---------------"); logger.info(methodName, null, "\n", upd.toString()); logger.info(methodName, null, "------------------------------------------------"); @@ -1434,6 +1447,14 @@ public class Scheduler return ret; } + public synchronized void signalState(DuccId jobid, String state) + { + IRmJob j = allJobs.get(jobid); + if ( j != null ) { // might not be here yet, we'll get it later + j.setState(state); + } + } + /** * Callback from job manager, need shares for a new fair-share job. */ @@ -1526,6 +1547,12 @@ public class Scheduler String methodName = "processCompletion"; logger.info(methodName, job.getId(), "Job completes."); + try { + persistence.deleteJob(job); // UIMA-4577 + } catch (Exception e) { + logger.warn(methodName, job.getId(), "Cannot delete job from database:", e); + } + // -- clean up the running jobs list IRmJob j = allJobs.remove(job.getId()); if ( j == null ) { @@ -1683,6 +1710,11 @@ public class Scheduler logger.info(methodName, j.getId(), "Recovered job:", j.toString()); logger.info(methodName, j.getId(), "Recovered shares:", sharenames.toString()); + try { + persistence.addJob(j); + } catch (Exception e) { + logger.warn(methodName, j.getId(), "Cannot persist recovered job in database:", j); + } // After a reconfig/restart the share may be in the wrong place, in which case it // needs to be removed. We have to wait until it is fully hooked into the structures // before scheduling for removal because it could take a while to go away and