Author: challngr Date: Tue Mar 19 14:31:02 2013 New Revision: 1458311 URL: http://svn.apache.org/r1458311 Log: UIMA-2667 Rewrite the 'takeFromTheRich' routine for defragmentation. Also fixes a 1-line bug in the counting method apportion_qshares, a loop that terminated incorrectly, causing a divisor to go negative, and hence class/user/job counts to go negative, under high load.
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java?rev=1458311&r1=1458310&r2=1458311&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java Tue Mar 19 14:31:02 2013 @@ -459,6 +459,7 @@ public class NodepoolScheduler allweights -= e.getShareWeight(); } } + if ( allweights <=0 ) break; // JRC JRC } // Remove entities that have everything they want @@ -1529,113 +1530,137 @@ public class NodepoolScheduler * ALL the user's jobs, we will have culled everything that makes no sense to * take from in the caller. * - * @return THe number of processes recovered. + * @return The number of processes we found space for. Note this could be different from the number + * of processes evicted, if it took more than one eviction to make spece. Also We may have + * evicted a process smaller than is needed, because there was already some free space on + * the machine. */ int takeFromTheRich(IRmJob nj, int needed, TreeMap<User, User> users_by_wealth, HashMap<User, TreeMap<IRmJob, IRmJob>> jobs_by_user) { String methodName = "takeFromTheRich"; - int given = 0; - - logger.debug(methodName, nj.getId(), "needed[", needed, "]"); + // 1. Collect all machines that have shares, which if evicted, would make enough space + // - in compatible NP + // - g + sum(shares belonging to rich users on the machine); + // 2. Order the machiens by + // a) richest user + // b) largest machine + // 3. Pick next machine, + // - clear enough shares + // - remove machine from list + // - update wealth + // 4. Repeat at 2 until + // a) have given what is needed + // b) nothing left to give + + // Map<Share, Share> exemptShares = new HashMap<Share, Share>(); // not eligible for various reasons + Map<IRmJob, IRmJob> candidateJobs = new HashMap<IRmJob, IRmJob>(); + Map<Machine, Machine> eligibleMachines = new TreeMap<Machine, Machine>(new EligibleMachineSorter()); - TreeMap<IRmJob, IRmJob> job_set = new TreeMap<IRmJob, IRmJob>(new JobByShareSorter()); // the collection of rich users jobs to take from - Map<Share, Share> shares = new TreeMap<Share, Share>(new FinalEvictionSorter()); - Map<Share, Share> removed = new HashMap<Share, Share>(); - - List<User> allUsers = new ArrayList<User>(); // for debug - - for ( User next_user : users_by_wealth.keySet() ) { - job_set.putAll(jobs_by_user.get(next_user)); // on each iter, the set of jobs with candidates grows - - allUsers.add(next_user); - logger.debug(methodName, nj.getId(), "Donating users:", allUsers); - logger.debug(methodName, nj.getId(), "Donating jobs:", listJobSet(job_set)); - - Map<IRmJob, IRmJob> donorJobs = new HashMap<IRmJob, IRmJob>(); - List<Share> donorShares = new ArrayList<Share>(); - boolean shares_found = false; - do { - shares_found = false; - IRmJob rich_j = job_set.firstKey(); // de rrrichest kind - - logger.debug(methodName, nj.getId(), "Inspecting job", rich_j.getId()); - // - // First lets see if something is pending and we can just reassign it. Nobody knows - // about this share but RM yet, so it's safe to do this. - // - shares.putAll(rich_j.getPendingShares()); // each new job makes the candidate set richer - shares.putAll(rich_j.getAssignedShares()); - removed.putAll(rich_j.getPendingRemoves()); - - for ( Share s : shares.keySet() ) { + for ( TreeMap<IRmJob, IRmJob> jobs : jobs_by_user.values() ) { + candidateJobs.putAll(jobs); + } - if ( removed.containsKey(s) ) continue; // already gone, ignore it - - if ( ! compatibleNodepools(s, nj) ) { - logger.trace(methodName, nj.getId(), "Bypassing pending share", s.toString(), "becaose of incompatible nodepool"); - continue; - } - logger.trace(methodName, nj.getId(), "Pending share", s.toString(), "is compatible with class", nj.getResourceClass().getName()); + int given = 0; + int orderNeeded = nj.getShareOrder(); + + ResourceClass cl = nj.getResourceClass(); + String npname = cl.getNodepoolName(); + NodePool np = globalNodepool.getSubpool(npname); + Map<Node, Machine> machines = np.getAllMachines(); // everything here is a candidate, nothing else is + + for ( Machine m : machines.values() ) { + if ( m.getShareOrder() < orderNeeded ) { + // logger.debug(methodName, nj.getId(), "Bypass ", m.getId(), ": too small for request of order", orderNeeded); + continue; + } + Map<Share, Share> as = m.getActiveShares(); + int g = m.getVirtualShareOrder(); + for ( Share s : as.values() ) { + IRmJob j = s.getJob(); + if ( s.isForceable() && candidateJobs.containsKey(j) ) { + g += j.getShareOrder(); + } + } + if ( g >= orderNeeded ) { + logger.trace(methodName, nj.getId(), "Candidate machine:", m.getId()); + eligibleMachines.put(m, m); + } else { + // (a) the share is not forceable (non-preemptbable, or already being removed), or + // (b) the share is not owned by a rich job + logger.trace(methodName, nj.getId(), "Not a candidate, insufficient rich jobs:", m.getId()); + } + } + logger.debug(methodName, nj.getId(), "Found", eligibleMachines.size(), "machines to be searched in this order:"); + for ( Machine m : eligibleMachines.keySet() ) { + logger.debug(methodName, nj.getId(), "Eligible machine:", m.getId()); + } + // first part done + + // Now just bop through the machines until either we can't find anything, or we find everything. + int given_per_round = 0; + do { + int g = 0; + for ( Machine m : eligibleMachines.keySet() ) { + HashMap<Share, Share> sh = m.getActiveShares(); + g = m.getVirtualShareOrder(); + List<Share> potentialShares = new ArrayList<Share>(); + for ( Share s : sh.values() ) { + IRmJob j = s.getJob(); + User u = j.getUser(); - Machine m = s.getMachine(); - // clear everything from this machine from jobs in the current job set - if ( m.getShareOrder() < nj.getShareOrder() ) { - logger.debug(methodName, nj.getId(), "Bypassing pending share", s.toString(), "because machine size[", m.getShareOrder(), "< job order[", nj.getShareOrder()); - continue; // job will never fit here - } - Map<Share, Share> activeShares = m.getActiveShares(); // shares on this machine - List<Share> candidates = new ArrayList<Share>(); - int total_available = m.getVirtualShareOrder(); // start with free space - - logger.debug(methodName, nj.getId(), "Machine", m.getId(), "activeShares", activeShares.keySet(), "total_available", total_available); - for ( Share as : activeShares.values() ) { - IRmJob tentative = as.getJob(); - if ( job_set.containsKey(tentative) ) { // share belong to a rich j? - donorJobs.put(tentative, tentative); - candidates.add(as); // yes it's a candidate - total_available += as.getShareOrder(); // add in shares that might work - //logger.debug(methodName, nj.getId(), "share", as.getId(), "in job", tentative.getId(), "is a candidate"); - } else { - //logger.debug(methodName, nj.getId(), "share", as.getId(), "in job", tentative.getId(), "is not a candidate"); - } - } - - int nj_shares = total_available / nj.getShareOrder(); // figure how many nj-sized shares we can get - nj_shares = Math.min(needed, nj_shares); // cap on actual needed - logger.debug(methodName, nj.getId(), "Machine", m.getId(), "total_available", total_available, "nj_shares", nj_shares); - if ( nj_shares > 0 ) { // if it works, pull candidates until we have enough - int g = m.getVirtualShareOrder(); - for ( Share cs : candidates ) { - if ( clearShare(cs, nj) ) removed.put(cs, cs); // return TRUE if evicted, and FALSE if reassigned to nj - User u = rich_j.getUser(); - u.subtractWealth(s.getShareOrder()); - donorShares.add(cs); - g += cs.getShareOrder(); - shares_found = true; - if ( (g / nj.getShareOrder() ) >= nj_shares ) { - break; + if ( s.isForceable() ) { + TreeMap<IRmJob, IRmJob> potentialJobs = jobs_by_user.get(u); + if ( (potentialJobs != null) && ( potentialJobs.containsKey(j) ) ) { + g += s.getShareOrder(); + if ( s.getShareOrder() == orderNeeded ) { + potentialShares.add(0, s); // exact matches first + } else { + potentialShares.add(s); } } } - given += nj_shares; - needed -= nj_shares; - if ( needed == 0 ) return given; + if ( g >= orderNeeded ) break; } + + if ( g >= orderNeeded ) { + // found enough on this machine for 1 share! + logger.debug(methodName, nj.getId(), "Clearing shares: g[", g, "], orderNeeded[", orderNeeded, "]"); + g = m.getVirtualShareOrder(); // reset + for ( Share s : potentialShares ) { + IRmJob j = s.getJob(); + User u = j.getUser(); + + g += s.getShareOrder(); + given_per_round++; + clearShare(s, nj); + u.subtractWealth(s.getShareOrder()); + logger.debug(methodName, nj.getId(), "Clearing share", s, "order[", s.getShareOrder(), + "]: g[", g, "], orderNeeded[", orderNeeded, "]"); + if ( g >= orderNeeded) break; // inner loop, could break on exact match without giving everything away + } + break; // outer loop, if anything was found + } + } - // TODO: This can't happen inside the loop - for ( IRmJob j : donorJobs.values() ) { // rebalance tree on job wealth after possible removal - job_set.remove(j); - job_set.put(j, j); + if ( given_per_round > 0 ) { + // Must reorder the eligible list to get the "next" best candidate. We could try to remove + // machines that were exhausted above ... + Map<Machine, Machine> tmp = new HashMap<Machine, Machine>(); + tmp.putAll(eligibleMachines); + eligibleMachines.clear(); + for ( Machine m : tmp.keySet() ) { + eligibleMachines.put(m, m); } - for ( Share s : donorShares ) { // given away, and removed from consideration - shares.remove(s); - } - } while (shares_found && ( given < needed) ); - } + // and also must track how many processes we ma made space for + given = given + (g / orderNeeded); // at least one,or else we have a bug + logger.debug(methodName, nj.getId(), "LOOPEND: given[", given, "] g[", g, "] orderNeeded[", orderNeeded, "]"); + } + } while ( (given_per_round > 0) && ( given < needed )); + return given; } @@ -2281,31 +2306,70 @@ public class NodepoolScheduler // This is a sorter for a tree map so we have to be sure not to return equality unless the objects // are the same objects. // - static private class FinalEvictionSorter - implements Comparator<Share> +// static private class FinalEvictionSorter +// implements Comparator<Share> +// { +// +// public int compare(Share s1, Share s2) +// { +// if ( s1 == s2 ) return 0; +// +// // pending shares first, no point expanding them if we don't have to +// if ( s1.isPending() && s2.isPending() ) return -1; +// if ( s1.isPending() ) return -1; +// if (s2.isPending() ) return 1; +// +// // Shares on machines with more space first, deal with defrag, which is why we're here +// int vso1 = s1.getMachine().countFreedUpShares(); +// int vso2 = s2.getMachine().countFreedUpShares(); +// +// if ( vso1 != vso2 ) { +// return vso2 - vso1; // (more space first) +// } +// +// // All else being equal, use investment +// int inv = (int) (s1.getInvestment() - s2.getInvestment()); +// if ( inv == 0 ) return -1; // careful not to return 0 +// return inv; +// } +// } + + // + // Sort machines for defrag. + // a) machines with richest users first + // b) largest machine second + // + static private class EligibleMachineSorter + implements Comparator<Machine> { - public int compare(Share s1, Share s2) + public int compare(Machine m1, Machine m2) { - if ( s1 == s2 ) return 0; + if ( m1 == m2 ) return 0; - // pending shares first, no point expanding them if we don't have to - if ( s1.isPending() && s2.isPending() ) return -1; - if ( s1.isPending() ) return -1; - if (s2.isPending() ) return 1; + int m1wealth = 0; + int m2wealth = 0; + Map<Share, Share> sh1 = m1.getActiveShares(); + for ( Share s : sh1.values() ) { + IRmJob j = s.getJob(); + User u = j.getUser(); + m1wealth = Math.max(m1wealth, u.getShareWealth()); + } + + Map<Share, Share> sh2 = m2.getActiveShares(); + for ( Share s : sh2.values() ) { + IRmJob j = s.getJob(); + User u = j.getUser(); + m2wealth = Math.max(m2wealth, u.getShareWealth()); + } - // Shares on machines with more space first, deal with defrag, which is why we're here - int vso1 = s1.getMachine().countFreedUpShares(); - int vso2 = s2.getMachine().countFreedUpShares(); + if ( m1wealth != m2wealth ) return m2wealth - m1wealth; // richest uesr first - if ( vso1 != vso2 ) { - return vso2 - vso1; // (more space first) - } + long m1mem = m1.getMemory(); + long m2mem = m2.getMemory(); - // All else being equal, use investment - int inv = (int) (s1.getInvestment() - s2.getInvestment()); - if ( inv == 0 ) return -1; // careful not to return 0 - return inv; + if ( m1mem == m2mem ) return -1; // for tree map, must not return 0 unless same object + return (int) (m2mem - m1mem); // largest machine first. } } Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java?rev=1458311&r1=1458310&r2=1458311&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java Tue Mar 19 14:31:02 2013 @@ -303,16 +303,20 @@ public class Scheduler * We only get one such name, so we give up the search if we find * it. */ + static String cached_domain = null; private String getDomainName() { String methodName = "getDomainName"; + + if ( cached_domain != null ) return cached_domain; try { NodeIdentity ni = new NodeIdentity(); for ( IIdentity id : ni.getNodeIdentities()) { String n = id.getName(); int ndx = n.indexOf("."); if ( ndx > 0 ) { - return n.substring(ndx + 1); + cached_domain = n.substring(ndx + 1); + return cached_domain; } } } catch (Exception e) {