YARN-4359. Update LowCost agents logic to take advantage of YARN-4358. (Jonathan Yaniv and Ishai Menache via Subru).
(cherry picked from commit a3a615eeab8c14ccdc548311097e62a916963dc5) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0eae1c63 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0eae1c63 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0eae1c63 Branch: refs/heads/branch-2 Commit: 0eae1c636862a13d89f23a3af113bab9dec5c651 Parents: 3e7f97f Author: Subru Krishnan <su...@apache.org> Authored: Mon May 1 16:01:07 2017 -0700 Committer: Subru Krishnan <su...@apache.org> Committed: Mon May 1 18:30:58 2017 -0700 ---------------------------------------------------------------------- .../reservation/InMemoryPlan.java | 11 + .../resourcemanager/reservation/PlanView.java | 9 + .../planning/AlignedPlannerWithGreedy.java | 15 +- .../planning/GreedyReservationAgent.java | 13 +- .../reservation/planning/IterativePlanner.java | 196 ++++----- .../reservation/planning/ReservationAgent.java | 23 +- .../planning/SimpleCapacityReplanner.java | 8 +- .../reservation/planning/StageAllocator.java | 10 +- .../planning/StageAllocatorGreedy.java | 4 +- .../planning/StageAllocatorGreedyRLE.java | 4 +- .../planning/StageAllocatorLowCostAligned.java | 279 +++++++++---- .../planning/StageEarliestStart.java | 46 --- .../planning/StageEarliestStartByDemand.java | 106 ----- .../StageEarliestStartByJobArrival.java | 39 -- .../planning/StageExecutionInterval.java | 47 +++ .../StageExecutionIntervalByDemand.java | 144 +++++++ .../StageExecutionIntervalUnconstrained.java | 73 ++++ .../planning/TestAlignedPlanner.java | 413 +++++++++++++++++-- .../planning/TestGreedyReservationAgent.java | 10 +- .../planning/TestSimpleCapacityReplanner.java | 4 +- 20 files changed, 989 insertions(+), 465 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/0eae1c63/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java index 3afcd47..783fd09 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java @@ -687,4 +687,15 @@ public class InMemoryPlan implements Plan { readLock.unlock(); } } + + @Override + public RLESparseResourceAllocation getCumulativeLoadOverTime( + long start, long end) { + readLock.lock(); + try { + return rleSparseVector.getRangeOverlapping(start, end); + } finally { + readLock.unlock(); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0eae1c63/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java index 699f461..2767993 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java @@ -174,4 +174,13 @@ public interface PlanView extends PlanContext { public RLESparseResourceAllocation getConsumptionForUserOverTime(String user, long start, long end); + /** + * Get the cumulative load over a time interval. + * + * @param start Start of the time interval. + * @param end End of the time interval. + * @return RLE sparse allocation. + */ + RLESparseResourceAllocation getCumulativeLoadOverTime(long start, long end); + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0eae1c63/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java index 00c2333..3853f41 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java @@ -39,6 +39,8 @@ public class AlignedPlannerWithGreedy implements ReservationAgent { public static final int DEFAULT_SMOOTHNESS_FACTOR = 10; public static final String SMOOTHNESS_FACTOR = "yarn.resourcemanager.reservation-system.smoothness-factor"; + private boolean allocateLeft = false; + // Log private static final Logger LOG = LoggerFactory @@ -49,26 +51,31 @@ public class AlignedPlannerWithGreedy implements ReservationAgent { // Constructor public AlignedPlannerWithGreedy() { + } @Override public void init(Configuration conf) { int smoothnessFactor = conf.getInt(SMOOTHNESS_FACTOR, DEFAULT_SMOOTHNESS_FACTOR); + allocateLeft = conf.getBoolean(FAVOR_EARLY_ALLOCATION, + DEFAULT_GREEDY_FAVOR_EARLY_ALLOCATION); // List of algorithms List<ReservationAgent> listAlg = new LinkedList<ReservationAgent>(); // LowCostAligned planning algorithm ReservationAgent algAligned = - new IterativePlanner(new StageEarliestStartByDemand(), - new StageAllocatorLowCostAligned(smoothnessFactor), false); + new IterativePlanner(new StageExecutionIntervalByDemand(), + new StageAllocatorLowCostAligned(smoothnessFactor, allocateLeft), + allocateLeft); + listAlg.add(algAligned); // Greedy planning algorithm ReservationAgent algGreedy = - new IterativePlanner(new StageEarliestStartByJobArrival(), - new StageAllocatorGreedy(), false); + new IterativePlanner(new StageExecutionIntervalUnconstrained(), + new StageAllocatorGreedyRLE(allocateLeft), allocateLeft); listAlg.add(algGreedy); // Set planner: http://git-wip-us.apache.org/repos/asf/hadoop/blob/0eae1c63/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java index 1559b97..637a17b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java @@ -47,9 +47,6 @@ public class GreedyReservationAgent implements ReservationAgent { // Greedy planner private ReservationAgent planner; - public final static String GREEDY_FAVOR_EARLY_ALLOCATION = - "yarn.resourcemanager.reservation-system.favor-early-allocation"; - public final static boolean DEFAULT_GREEDY_FAVOR_EARLY_ALLOCATION = true; private boolean allocateLeft; public GreedyReservationAgent() { @@ -57,20 +54,20 @@ public class GreedyReservationAgent implements ReservationAgent { @Override public void init(Configuration conf) { - allocateLeft = conf.getBoolean(GREEDY_FAVOR_EARLY_ALLOCATION, + allocateLeft = conf.getBoolean(FAVOR_EARLY_ALLOCATION, DEFAULT_GREEDY_FAVOR_EARLY_ALLOCATION); if (allocateLeft) { LOG.info("Initializing the GreedyReservationAgent to favor \"early\"" + " (left) allocations (controlled by parameter: " - + GREEDY_FAVOR_EARLY_ALLOCATION + ")"); + + FAVOR_EARLY_ALLOCATION + ")"); } else { LOG.info("Initializing the GreedyReservationAgent to favor \"late\"" + " (right) allocations (controlled by parameter: " - + GREEDY_FAVOR_EARLY_ALLOCATION + ")"); + + FAVOR_EARLY_ALLOCATION + ")"); } planner = - new IterativePlanner(new StageEarliestStartByJobArrival(), + new IterativePlanner(new StageExecutionIntervalUnconstrained(), new StageAllocatorGreedyRLE(allocateLeft), allocateLeft); } @@ -123,4 +120,4 @@ public class GreedyReservationAgent implements ReservationAgent { } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0eae1c63/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java index 24d237a..83f272e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; -import java.util.HashMap; import java.util.HashSet; import java.util.ListIterator; import java.util.Map; @@ -32,26 +31,24 @@ import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval; -import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.util.resource.Resources; /** * A planning algorithm consisting of two main phases. The algorithm iterates - * over the job stages in descending order. For each stage, the algorithm: 1. - * Determines an interval [stageArrivalTime, stageDeadline) in which the stage - * is allocated. 2. Computes an allocation for the stage inside the interval. - * - * For ANY and ALL jobs, phase 1 sets the allocation window of each stage to be - * [jobArrival, jobDeadline]. For ORDER and ORDER_NO_GAP jobs, the deadline of - * each stage is set as succcessorStartTime - the starting time of its - * succeeding stage (or jobDeadline if it is the last stage). - * - * The phases are set using the two functions: 1. setAlgEarliestStartTime 2. - * setAlgComputeStageAllocation + * over the job stages in ascending/descending order, depending on the flag + * allocateLeft. For each stage, the algorithm: 1. Determines an interval + * [stageArrival, stageDeadline) in which the stage is allocated. 2. Computes an + * allocation for the stage inside the interval. For ANY and ALL jobs, phase 1 + * sets the allocation window of each stage to be [jobArrival, jobDeadline]. For + * ORDER and ORDER_NO_GAP jobs, the deadline of each stage is set as + * succcessorStartTime - the starting time of its succeeding stage (or + * jobDeadline if it is the last stage). The phases are set using the two + * functions: 1. setAlgStageExecutionInterval 2.setAlgStageAllocator */ public class IterativePlanner extends PlanningAlgorithm { @@ -60,7 +57,7 @@ public class IterativePlanner extends PlanningAlgorithm { private RLESparseResourceAllocation planModifications; // Data extracted from plan - private Map<Long, Resource> planLoads; + private RLESparseResourceAllocation planLoads; private Resource capacity; private long step; @@ -70,16 +67,16 @@ public class IterativePlanner extends PlanningAlgorithm { private long jobDeadline; // Phase algorithms - private StageEarliestStart algStageEarliestStart = null; + private StageExecutionInterval algStageExecutionInterval = null; private StageAllocator algStageAllocator = null; private final boolean allocateLeft; // Constructor - public IterativePlanner(StageEarliestStart algEarliestStartTime, + public IterativePlanner(StageExecutionInterval algStageExecutionInterval, StageAllocator algStageAllocator, boolean allocateLeft) { this.allocateLeft = allocateLeft; - setAlgStageEarliestStart(algEarliestStartTime); + setAlgStageExecutionInterval(algStageExecutionInterval); setAlgStageAllocator(algStageAllocator); } @@ -101,12 +98,6 @@ public class IterativePlanner extends PlanningAlgorithm { // Current stage ReservationRequest currentReservationStage; - // Stage deadlines - long stageDeadline = stepRoundDown(reservation.getDeadline(), step); - long successorStartingTime = -1; - long predecessorEndTime = stepRoundDown(reservation.getArrival(), step); - long stageArrivalTime = -1; - // Iterate the stages in reverse order while (stageProvider.hasNext()) { @@ -116,27 +107,17 @@ public class IterativePlanner extends PlanningAlgorithm { // Validate that the ReservationRequest respects basic constraints validateInputStage(plan, currentReservationStage); - // Compute an adjusted earliestStart for this resource - // (we need this to provision some space for the ORDER contracts) + // Set the stageArrival and stageDeadline + ReservationInterval stageInterval = + setStageExecutionInterval(plan, reservation, currentReservationStage, + allocations); + Long stageArrival = stageInterval.getStartTime(); + Long stageDeadline = stageInterval.getEndTime(); - if (allocateLeft) { - stageArrivalTime = predecessorEndTime; - } else { - stageArrivalTime = reservation.getArrival(); - if (jobType == ReservationRequestInterpreter.R_ORDER - || jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) { - stageArrivalTime = - computeEarliestStartingTime(plan, reservation, - stageProvider.getCurrentIndex(), currentReservationStage, - stageDeadline); - } - stageArrivalTime = stepRoundUp(stageArrivalTime, step); - stageArrivalTime = Math.max(stageArrivalTime, reservation.getArrival()); - } - // Compute the allocation of a single stage + // Compute stage allocation Map<ReservationInterval, Resource> curAlloc = - computeStageAllocation(plan, currentReservationStage, - stageArrivalTime, stageDeadline, user, reservationId); + computeStageAllocation(plan, currentReservationStage, stageArrival, + stageDeadline, user, reservationId); // If we did not find an allocation, return NULL // (unless it's an ANY job, then we simply continue). @@ -152,9 +133,13 @@ public class IterativePlanner extends PlanningAlgorithm { } - // Get the start & end time of the current allocation - Long stageStartTime = findEarliestTime(curAlloc); - Long stageEndTime = findLatestTime(curAlloc); + // Validate ORDER_NO_GAP + if (jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) { + if (!validateOrderNoGap(allocations, curAlloc, allocateLeft)) { + throw new PlanningException( + "The allocation found does not respect ORDER_NO_GAP"); + } + } // If we did find an allocation for the stage, add it for (Entry<ReservationInterval, Resource> entry : curAlloc.entrySet()) { @@ -165,33 +150,6 @@ public class IterativePlanner extends PlanningAlgorithm { if (jobType == ReservationRequestInterpreter.R_ANY) { break; } - - // If ORDER job, set the stageDeadline of the next stage to be processed - if (jobType == ReservationRequestInterpreter.R_ORDER - || jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) { - - // CHECK ORDER_NO_GAP - // Verify that there is no gap, in case the job is ORDER_NO_GAP - // note that the test is different left-to-right and right-to-left - if (jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP - && successorStartingTime != -1 - && ((allocateLeft && predecessorEndTime < stageStartTime) || - (!allocateLeft && (stageEndTime < successorStartingTime)) - ) - || (!isNonPreemptiveAllocation(curAlloc))) { - throw new PlanningException( - "The allocation found does not respect ORDER_NO_GAP"); - } - - if (allocateLeft) { - // Store the stageStartTime and set the new stageDeadline - predecessorEndTime = stageEndTime; - } else { - // Store the stageStartTime and set the new stageDeadline - successorStartingTime = stageStartTime; - stageDeadline = stageStartTime; - } - } } // If the allocation is empty, return an error @@ -200,7 +158,39 @@ public class IterativePlanner extends PlanningAlgorithm { } return allocations; + } + protected static boolean validateOrderNoGap( + RLESparseResourceAllocation allocations, + Map<ReservationInterval, Resource> curAlloc, boolean allocateLeft) { + + // Left to right + if (allocateLeft) { + Long stageStartTime = findEarliestTime(curAlloc); + Long allocationEndTime = allocations.getLatestNonNullTime(); + + // Check that there is no gap between stages + if ((allocationEndTime != -1) && (allocationEndTime < stageStartTime)) { + return false; + } + // Right to left + } else { + Long stageEndTime = findLatestTime(curAlloc); + Long allocationStartTime = allocations.getEarliestStartTime(); + + // Check that there is no gap between stages + if ((allocationStartTime != -1) && (stageEndTime < allocationStartTime)) { + return false; + } + } + + // Check that the stage allocation does not violate ORDER_NO_GAP + if (!isNonPreemptiveAllocation(curAlloc)) { + return false; + } + + // The allocation is legal + return true; } protected void initialize(Plan plan, ReservationId reservationId, @@ -223,35 +213,15 @@ public class IterativePlanner extends PlanningAlgorithm { // planLoads are not used by other StageAllocators... and don't deal // well with huge reservation ranges - if (this.algStageAllocator instanceof StageAllocatorLowCostAligned) { - planLoads = getAllLoadsInInterval(plan, jobArrival, jobDeadline); - ReservationAllocation oldRes = plan.getReservationById(reservationId); - if (oldRes != null) { - planModifications = - RLESparseResourceAllocation.merge(plan.getResourceCalculator(), - plan.getTotalCapacity(), planModifications, - oldRes.getResourcesOverTime(), RLEOperator.subtract, - jobArrival, jobDeadline); - } + planLoads = plan.getCumulativeLoadOverTime(jobArrival, jobDeadline); + ReservationAllocation oldRes = plan.getReservationById(reservationId); + if (oldRes != null) { + planLoads = + RLESparseResourceAllocation.merge(plan.getResourceCalculator(), + plan.getTotalCapacity(), planLoads, + oldRes.getResourcesOverTime(), RLEOperator.subtract, jobArrival, + jobDeadline); } - - } - - private Map<Long, Resource> getAllLoadsInInterval(Plan plan, long startTime, - long endTime) { - - // Create map - Map<Long, Resource> loads = new HashMap<Long, Resource>(); - - // Calculate the load for every time slot between [start,end) - for (long t = startTime; t < endTime; t += step) { - Resource load = plan.getTotalCommittedResources(t); - loads.put(t, load); - } - - // Return map - return loads; - } private void validateInputStage(Plan plan, ReservationRequest rr) @@ -286,7 +256,7 @@ public class IterativePlanner extends PlanningAlgorithm { } - private boolean isNonPreemptiveAllocation( + private static boolean isNonPreemptiveAllocation( Map<ReservationInterval, Resource> curAlloc) { // Checks whether a stage allocation is non preemptive or not. @@ -329,14 +299,13 @@ public class IterativePlanner extends PlanningAlgorithm { } - // Call algEarliestStartTime() - protected long computeEarliestStartingTime(Plan plan, - ReservationDefinition reservation, int index, - ReservationRequest currentReservationStage, long stageDeadline) { - - return algStageEarliestStart.setEarliestStartTime(plan, reservation, index, - currentReservationStage, stageDeadline); - + // Call setStageExecutionInterval() + protected ReservationInterval setStageExecutionInterval(Plan plan, + ReservationDefinition reservation, + ReservationRequest currentReservationStage, + RLESparseResourceAllocation allocations) { + return algStageExecutionInterval.computeExecutionInterval(plan, + reservation, currentReservationStage, allocateLeft, allocations); } // Call algStageAllocator @@ -350,10 +319,11 @@ public class IterativePlanner extends PlanningAlgorithm { } - // Set the algorithm: algStageEarliestStart - public IterativePlanner setAlgStageEarliestStart(StageEarliestStart alg) { + // Set the algorithm: algStageExecutionInterval + public IterativePlanner setAlgStageExecutionInterval( + StageExecutionInterval alg) { - this.algStageEarliestStart = alg; + this.algStageExecutionInterval = alg; return this; // To allow concatenation of setAlg() functions } @@ -375,7 +345,7 @@ public class IterativePlanner extends PlanningAlgorithm { private final boolean allocateLeft; - private ListIterator<ReservationRequest> li; + private final ListIterator<ReservationRequest> li; public StageProvider(boolean allocateLeft, ReservationDefinition reservation) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/0eae1c63/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/ReservationAgent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/ReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/ReservationAgent.java index 52e7055..3c448b3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/ReservationAgent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/ReservationAgent.java @@ -29,14 +29,25 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.Plan public interface ReservationAgent { /** + * Constant defining the preferential treatment of time for equally valid + * allocations. + */ + final static String FAVOR_EARLY_ALLOCATION = + "yarn.resourcemanager.reservation-system.favor-early-allocation"; + /** + * By default favor early allocations. + */ + final static boolean DEFAULT_GREEDY_FAVOR_EARLY_ALLOCATION = true; + + /** * Create a reservation for the user that abides by the specified contract - * + * * @param reservationId the identifier of the reservation to be created. * @param user the user who wants to create the reservation * @param plan the Plan to which the reservation must be fitted * @param contract encapsulates the resources the user requires for his * session - * + * * @return whether the create operation was successful or not * @throws PlanningException if the session cannot be fitted into the plan */ @@ -45,13 +56,13 @@ public interface ReservationAgent { /** * Update a reservation for the user that abides by the specified contract - * + * * @param reservationId the identifier of the reservation to be updated * @param user the user who wants to create the session * @param plan the Plan to which the reservation must be fitted * @param contract encapsulates the resources the user requires for his * reservation - * + * * @return whether the update operation was successful or not * @throws PlanningException if the reservation cannot be fitted into the plan */ @@ -60,11 +71,11 @@ public interface ReservationAgent { /** * Delete an user reservation - * + * * @param reservationId the identifier of the reservation to be deleted * @param user the user who wants to create the reservation * @param plan the Plan to which the session must be fitted - * + * * @return whether the delete operation was successful or not * @throws PlanningException if the reservation cannot be fitted into the plan */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/0eae1c63/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimpleCapacityReplanner.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimpleCapacityReplanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimpleCapacityReplanner.java index 7507783..7bfc730 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimpleCapacityReplanner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimpleCapacityReplanner.java @@ -42,7 +42,7 @@ import com.google.common.annotations.VisibleForTesting; * This (re)planner scan a period of time from now to a maximum time window (or * the end of the last session, whichever comes first) checking the overall * capacity is not violated. - * + * * It greedily removes sessions in reversed order of acceptance (latest accepted * is the first removed). */ @@ -90,8 +90,8 @@ public class SimpleCapacityReplanner implements Planner { // loop on all moment in time from now to the end of the check Zone // or the end of the planned sessions whichever comes first - for (long t = now; - (t < plan.getLastEndTime() && t < (now + lengthOfCheckZone)); + for (long t = now; + (t < plan.getLastEndTime() && t < (now + lengthOfCheckZone)); t += plan.getStep()) { Resource excessCap = Resources.subtract(plan.getTotalCommittedResources(t), totCap); @@ -102,7 +102,7 @@ public class SimpleCapacityReplanner implements Planner { new TreeSet<ReservationAllocation>(plan.getReservationsAtTime(t)); for (Iterator<ReservationAllocation> resIter = curReservations.iterator(); resIter.hasNext() - && Resources.greaterThan(resCalc, totCap, excessCap, + && Resources.greaterThan(resCalc, totCap, excessCap, ZERO_RESOURCE);) { ReservationAllocation reservation = resIter.next(); plan.deleteReservation(reservation.getReservationId()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/0eae1c63/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java index b95f8d4..ec6d9c0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java @@ -41,19 +41,21 @@ public interface StageAllocator { * @param planModifications the allocations performed by the planning * algorithm which are not yet reflected by plan * @param rr the stage - * @param stageEarliestStart the arrival time (earliest starting time) set for + * @param stageArrival the arrival time (earliest starting time) set for * the stage by the two phase planning algorithm * @param stageDeadline the deadline of the stage set by the two phase * planning algorithm + * @param user name of the user + * @param oldId identifier of the old reservation * * @return The computed allocation (or null if the stage could not be * allocated) * @throws PlanningException */ Map<ReservationInterval, Resource> computeStageAllocation(Plan plan, - Map<Long, Resource> planLoads, + RLESparseResourceAllocation planLoads, RLESparseResourceAllocation planModifications, ReservationRequest rr, - long stageEarliestStart, long stageDeadline, String user, + long stageArrival, long stageDeadline, String user, ReservationId oldId) throws PlanningException; -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0eae1c63/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java index c836970..da04336 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java @@ -26,8 +26,8 @@ import org.apache.hadoop.yarn.api.records.ReservationRequest; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation; -import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval; import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.util.resource.Resources; @@ -41,7 +41,7 @@ public class StageAllocatorGreedy implements StageAllocator { @Override public Map<ReservationInterval, Resource> computeStageAllocation(Plan plan, - Map<Long, Resource> planLoads, + RLESparseResourceAllocation planLoads, RLESparseResourceAllocation planModifications, ReservationRequest rr, long stageEarliestStart, long stageDeadline, String user, ReservationId oldId) throws PlanningException { http://git-wip-us.apache.org/repos/asf/hadoop/blob/0eae1c63/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java index 5e748fc..ec83e02 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedyRLE.java @@ -29,8 +29,8 @@ import org.apache.hadoop.yarn.api.records.ReservationRequest; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation; -import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval; import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.util.resource.Resources; @@ -52,7 +52,7 @@ public class StageAllocatorGreedyRLE implements StageAllocator { @Override public Map<ReservationInterval, Resource> computeStageAllocation(Plan plan, - Map<Long, Resource> planLoads, + RLESparseResourceAllocation planLoads, RLESparseResourceAllocation planModifications, ReservationRequest rr, long stageEarliestStart, long stageDeadline, String user, ReservationId oldId) throws PlanningException { http://git-wip-us.apache.org/repos/asf/hadoop/blob/0eae1c63/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java index b9fd8e1..e45f58c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java @@ -18,8 +18,12 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; +import java.util.ArrayList; import java.util.Comparator; +import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.NavigableMap; import java.util.TreeSet; import org.apache.hadoop.yarn.api.records.ReservationId; @@ -27,46 +31,55 @@ import org.apache.hadoop.yarn.api.records.ReservationRequest; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; /** * A stage allocator that iteratively allocates containers in the * {@link DurationInterval} with lowest overall cost. The algorithm only - * considers intervals of the form: [stageDeadline - (n+1)*duration, - * stageDeadline - n*duration) for an integer n. This guarantees that the - * allocations are aligned (as opposed to overlapping duration intervals). - * - * The smoothnessFactor parameter controls the number of containers that are - * simultaneously allocated in each iteration of the algorithm. + * considers non-overlapping intervals of length 'duration'. This guarantees + * that the allocations are aligned. If 'allocateLeft == true', the intervals + * considered by the algorithm are aligned to stageArrival; otherwise, they are + * aligned to stageDeadline. The smoothnessFactor parameter controls the number + * of containers that are simultaneously allocated in each iteration of the + * algorithm. */ public class StageAllocatorLowCostAligned implements StageAllocator { + private final boolean allocateLeft; // Smoothness factor private int smoothnessFactor = 10; // Constructor - public StageAllocatorLowCostAligned() { + public StageAllocatorLowCostAligned(boolean allocateLeft) { + this.allocateLeft = allocateLeft; } // Constructor - public StageAllocatorLowCostAligned(int smoothnessFactor) { + public StageAllocatorLowCostAligned(int smoothnessFactor, + boolean allocateLeft) { + this.allocateLeft = allocateLeft; this.smoothnessFactor = smoothnessFactor; } - // computeJobAllocation() @Override - public Map<ReservationInterval, Resource> computeStageAllocation( - Plan plan, Map<Long, Resource> planLoads, + public Map<ReservationInterval, Resource> computeStageAllocation(Plan plan, + RLESparseResourceAllocation planLoads, RLESparseResourceAllocation planModifications, ReservationRequest rr, - long stageEarliestStart, long stageDeadline, String user, - ReservationId oldId) { + long stageArrival, long stageDeadline, String user, ReservationId oldId) + throws PlanningException { // Initialize ResourceCalculator resCalc = plan.getResourceCalculator(); Resource capacity = plan.getTotalCapacity(); + + RLESparseResourceAllocation netRLERes = plan + .getAvailableResourceOverTime(user, oldId, stageArrival, stageDeadline); + long step = plan.getStep(); // Create allocationRequestsearlies @@ -76,16 +89,15 @@ public class StageAllocatorLowCostAligned implements StageAllocator { // Initialize parameters long duration = stepRoundUp(rr.getDuration(), step); int windowSizeInDurations = - (int) ((stageDeadline - stageEarliestStart) / duration); + (int) ((stageDeadline - stageArrival) / duration); int totalGangs = rr.getNumContainers() / rr.getConcurrency(); int numContainersPerGang = rr.getConcurrency(); Resource gang = Resources.multiply(rr.getCapability(), numContainersPerGang); // Set maxGangsPerUnit - int maxGangsPerUnit = - (int) Math.max( - Math.floor(((double) totalGangs) / windowSizeInDurations), 1); + int maxGangsPerUnit = (int) Math + .max(Math.floor(((double) totalGangs) / windowSizeInDurations), 1); maxGangsPerUnit = Math.max(maxGangsPerUnit / smoothnessFactor, 1); // If window size is too small, return null @@ -93,6 +105,8 @@ public class StageAllocatorLowCostAligned implements StageAllocator { return null; } + final int preferLeft = allocateLeft ? 1 : -1; + // Initialize tree sorted by costs TreeSet<DurationInterval> durationIntervalsSortedByCost = new TreeSet<DurationInterval>(new Comparator<DurationInterval>() { @@ -104,23 +118,26 @@ public class StageAllocatorLowCostAligned implements StageAllocator { return cmp; } - return (-1) * Long.compare(val1.getEndTime(), val2.getEndTime()); + return preferLeft + * Long.compare(val1.getEndTime(), val2.getEndTime()); } }); + List<Long> intervalEndTimes = + computeIntervalEndTimes(stageArrival, stageDeadline, duration); + // Add durationIntervals that end at (endTime - n*duration) for some n. - for (long intervalEnd = stageDeadline; intervalEnd >= stageEarliestStart - + duration; intervalEnd -= duration) { + for (long intervalEnd : intervalEndTimes) { long intervalStart = intervalEnd - duration; // Get duration interval [intervalStart,intervalEnd) DurationInterval durationInterval = getDurationInterval(intervalStart, intervalEnd, planLoads, - planModifications, capacity, resCalc, step); + planModifications, capacity, netRLERes, resCalc, step, gang); // If the interval can fit a gang, add it to the tree - if (durationInterval.canAllocate(gang, capacity, resCalc)) { + if (durationInterval.canAllocate()) { durationIntervalsSortedByCost.add(durationInterval); } } @@ -139,8 +156,7 @@ public class StageAllocatorLowCostAligned implements StageAllocator { durationIntervalsSortedByCost.first(); int numGangsToAllocate = Math.min(maxGangsPerUnit, remainingGangs); numGangsToAllocate = - Math.min(numGangsToAllocate, - bestDurationInterval.numCanFit(gang, capacity, resCalc)); + Math.min(numGangsToAllocate, bestDurationInterval.numCanFit()); // Add it remainingGangs -= numGangsToAllocate; @@ -148,9 +164,8 @@ public class StageAllocatorLowCostAligned implements StageAllocator { new ReservationInterval(bestDurationInterval.getStartTime(), bestDurationInterval.getEndTime()); - Resource reservationRes = - Resources.multiply(rr.getCapability(), rr.getConcurrency() - * numGangsToAllocate); + Resource reservationRes = Resources.multiply(rr.getCapability(), + rr.getConcurrency() * numGangsToAllocate); planModifications.addInterval(reservationInt, reservationRes); allocationRequests.addInterval(reservationInt, reservationRes); @@ -162,10 +177,10 @@ public class StageAllocatorLowCostAligned implements StageAllocator { DurationInterval updatedDurationInterval = getDurationInterval(bestDurationInterval.getStartTime(), bestDurationInterval.getStartTime() + duration, planLoads, - planModifications, capacity, resCalc, step); + planModifications, capacity, netRLERes, resCalc, step, gang); // Add to tree, if possible - if (updatedDurationInterval.canAllocate(gang, capacity, resCalc)) { + if (updatedDurationInterval.canAllocate()) { durationIntervalsSortedByCost.add(updatedDurationInterval); } @@ -180,10 +195,12 @@ public class StageAllocatorLowCostAligned implements StageAllocator { return allocations; } else { - // If we are here is because we did not manage to satisfy this request. - // We remove unwanted side-effect from planModifications (needed for ANY). - for (Map.Entry<ReservationInterval, Resource> tempAllocation - : allocations.entrySet()) { + // If we are here is because we did not manage to satisfy this + // request. + // We remove unwanted side-effect from planModifications (needed for + // ANY). + for (Map.Entry<ReservationInterval, Resource> tempAllocation : allocations + .entrySet()) { planModifications.removeInterval(tempAllocation.getKey(), tempAllocation.getValue()); @@ -196,37 +213,144 @@ public class StageAllocatorLowCostAligned implements StageAllocator { } - protected DurationInterval getDurationInterval(long startTime, long endTime, - Map<Long, Resource> planLoads, + private List<Long> computeIntervalEndTimes(long stageEarliestStart, + long stageDeadline, long duration) { + + List<Long> intervalEndTimes = new ArrayList<Long>(); + if (!allocateLeft) { + for (long intervalEnd = stageDeadline; intervalEnd >= stageEarliestStart + + duration; intervalEnd -= duration) { + intervalEndTimes.add(intervalEnd); + } + } else { + for (long intervalStart = + stageEarliestStart; intervalStart <= stageDeadline + - duration; intervalStart += duration) { + intervalEndTimes.add(intervalStart + duration); + } + } + + return intervalEndTimes; + } + + protected static DurationInterval getDurationInterval(long startTime, + long endTime, RLESparseResourceAllocation planLoads, RLESparseResourceAllocation planModifications, Resource capacity, - ResourceCalculator resCalc, long step) { + RLESparseResourceAllocation netRLERes, ResourceCalculator resCalc, + long step, Resource requestedResources) throws PlanningException { - // Initialize the dominant loads structure - Resource dominantResources = Resource.newInstance(0, 0); + // Get the total cost associated with the duration interval + double totalCost = getDurationIntervalTotalCost(startTime, endTime, + planLoads, planModifications, capacity, resCalc, step); - // Calculate totalCost and maxLoad - double totalCost = 0.0; - for (long t = startTime; t < endTime; t += step) { + // Calculate how many gangs can fit, i.e., how many times can 'capacity' + // be allocated within the duration interval [startTime, endTime) + int gangsCanFit = getDurationIntervalGangsCanFit(startTime, endTime, + planModifications, capacity, netRLERes, resCalc, requestedResources); + + // Return the desired durationInterval + return new DurationInterval(startTime, endTime, totalCost, gangsCanFit); + + } + + protected static double getDurationIntervalTotalCost(long startTime, + long endTime, RLESparseResourceAllocation planLoads, + RLESparseResourceAllocation planModifications, Resource capacity, + ResourceCalculator resCalc, long step) throws PlanningException { + + // Compute the current resource load within the interval [startTime,endTime) + // by adding planLoads (existing load) and planModifications (load that + // corresponds to the current job). + RLESparseResourceAllocation currentLoad = + RLESparseResourceAllocation.merge(resCalc, capacity, planLoads, + planModifications, RLEOperator.add, startTime, endTime); + + // Convert load from RLESparseResourceAllocation to a Map representation + NavigableMap<Long, Resource> mapCurrentLoad = currentLoad.getCumulative(); - // Get the load - Resource load = getLoadAtTime(t, planLoads, planModifications); + // Initialize auxiliary variables + double totalCost = 0.0; + Long tPrev = -1L; + Resource loadPrev = Resources.none(); + double cost = 0.0; + + // Iterate over time points. For each point 't', accumulate the total cost + // that corresponds to the interval [tPrev, t). The cost associated within + // this interval is fixed for each of the time steps, therefore the cost of + // a single step is multiplied by (t - tPrev) / step. + for (Entry<Long, Resource> e : mapCurrentLoad.entrySet()) { + Long t = e.getKey(); + Resource load = e.getValue(); + if (tPrev != -1L) { + tPrev = Math.max(tPrev, startTime); + cost = calcCostOfLoad(loadPrev, capacity, resCalc); + totalCost = totalCost + cost * (t - tPrev) / step; + } - // Increase the total cost - totalCost += calcCostOfLoad(load, capacity, resCalc); + tPrev = t; + loadPrev = load; + } - // Update the dominant resources - dominantResources = Resources.componentwiseMax(dominantResources, load); + // Add the cost associated with the last interval (the for loop does not + // calculate it). + if (loadPrev != null) { + // This takes care of the corner case of a single entry + tPrev = Math.max(tPrev, startTime); + cost = calcCostOfLoad(loadPrev, capacity, resCalc); + totalCost = totalCost + cost * (endTime - tPrev) / step; } - // Return the corresponding durationInterval - return new DurationInterval(startTime, endTime, totalCost, - dominantResources); + // Return the overall cost + return totalCost; + } + + protected static int getDurationIntervalGangsCanFit(long startTime, + long endTime, RLESparseResourceAllocation planModifications, + Resource capacity, RLESparseResourceAllocation netRLERes, + ResourceCalculator resCalc, Resource requestedResources) + throws PlanningException { + + // Initialize auxiliary variables + int gangsCanFit = Integer.MAX_VALUE; + int curGangsCanFit; + + // Calculate the total amount of available resources between startTime + // and endTime, by subtracting planModifications from netRLERes + RLESparseResourceAllocation netAvailableResources = + RLESparseResourceAllocation.merge(resCalc, capacity, netRLERes, + planModifications, RLEOperator.subtractTestNonNegative, startTime, + endTime); + + // Convert result to a map + NavigableMap<Long, Resource> mapAvailableCapacity = + netAvailableResources.getCumulative(); + + // Iterate over the map representation. + // At each point, calculate how many times does 'requestedResources' fit. + // The result is the minimum over all time points. + for (Entry<Long, Resource> e : mapAvailableCapacity.entrySet()) { + Long t = e.getKey(); + Resource curAvailable = e.getValue(); + if (t >= endTime) { + break; + } + if (curAvailable == null) { + gangsCanFit = 0; + } else { + curGangsCanFit = (int) Math.floor(Resources.divide(resCalc, capacity, + curAvailable, requestedResources)); + if (curGangsCanFit < gangsCanFit) { + gangsCanFit = curGangsCanFit; + } + } + } + return gangsCanFit; } protected double calcCostOfInterval(long startTime, long endTime, - Map<Long, Resource> planLoads, + RLESparseResourceAllocation planLoads, RLESparseResourceAllocation planModifications, Resource capacity, ResourceCalculator resCalc, long step) { @@ -242,7 +366,8 @@ public class StageAllocatorLowCostAligned implements StageAllocator { } - protected double calcCostOfTimeSlot(long t, Map<Long, Resource> planLoads, + protected double calcCostOfTimeSlot(long t, + RLESparseResourceAllocation planLoads, RLESparseResourceAllocation planModifications, Resource capacity, ResourceCalculator resCalc) { @@ -254,17 +379,17 @@ public class StageAllocatorLowCostAligned implements StageAllocator { } - protected Resource getLoadAtTime(long t, Map<Long, Resource> planLoads, + protected Resource getLoadAtTime(long t, + RLESparseResourceAllocation planLoads, RLESparseResourceAllocation planModifications) { - Resource planLoad = planLoads.get(t); - planLoad = (planLoad == null) ? Resource.newInstance(0, 0) : planLoad; + Resource planLoad = planLoads.getCapacityAtTime(t); return Resources.add(planLoad, planModifications.getCapacityAtTime(t)); } - protected double calcCostOfLoad(Resource load, Resource capacity, + protected static double calcCostOfLoad(Resource load, Resource capacity, ResourceCalculator resCalc) { return resCalc.ratio(load, capacity); @@ -289,42 +414,30 @@ public class StageAllocatorLowCostAligned implements StageAllocator { private long startTime; private long endTime; private double cost; - private Resource maxLoad; + private final int gangsCanFit; // Constructor public DurationInterval(long startTime, long endTime, double cost, - Resource maxLoad) { + int gangsCanfit) { this.startTime = startTime; this.endTime = endTime; this.cost = cost; - this.maxLoad = maxLoad; + this.gangsCanFit = gangsCanfit; } // canAllocate() - boolean function, returns whether requestedResources // can be allocated during the durationInterval without // violating capacity constraints - public boolean canAllocate(Resource requestedResources, Resource capacity, - ResourceCalculator resCalc) { - - Resource updatedMaxLoad = Resources.add(maxLoad, requestedResources); - return (resCalc.compare(capacity, updatedMaxLoad, capacity) <= 0); - + public boolean canAllocate() { + return (gangsCanFit > 0); } // numCanFit() - returns the maximal number of requestedResources can be // allocated during the durationInterval without violating // capacity constraints - public int numCanFit(Resource requestedResources, Resource capacity, - ResourceCalculator resCalc) { - - // Represents the largest resource demand that can be satisfied throughout - // the entire DurationInterval (i.e., during [startTime,endTime)) - Resource availableResources = Resources.subtract(capacity, maxLoad); - - // Maximal number of requestedResources that fit inside the interval - return (int) Math.floor(Resources.divide(resCalc, capacity, - availableResources, requestedResources)); + public int numCanFit() { + return gangsCanFit; } public long getStartTime() { @@ -343,14 +456,6 @@ public class StageAllocatorLowCostAligned implements StageAllocator { this.endTime = value; } - public Resource getMaxLoad() { - return this.maxLoad; - } - - public void setMaxLoad(Resource value) { - this.maxLoad = value; - } - public double getTotalCost() { return this.cost; } @@ -359,11 +464,17 @@ public class StageAllocatorLowCostAligned implements StageAllocator { this.cost = value; } + @Override public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(" start: " + startTime).append(" end: " + endTime) - .append(" cost: " + cost).append(" maxLoad: " + maxLoad); + .append(" cost: " + cost).append(" gangsCanFit: " + gangsCanFit); + return sb.toString(); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0eae1c63/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStart.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStart.java deleted file mode 100644 index 547616a..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStart.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; - -import org.apache.hadoop.yarn.api.records.ReservationDefinition; -import org.apache.hadoop.yarn.api.records.ReservationRequest; -import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; - -/** - * Interface for setting the earliest start time of a stage in IterativePlanner. - */ -public interface StageEarliestStart { - - /** - * Computes the earliest allowed starting time for a given stage. - * - * @param plan the Plan to which the reservation must be fitted - * @param reservation the job contract - * @param index the index of the stage in the job contract - * @param currentReservationStage the stage - * @param stageDeadline the deadline of the stage set by the two phase - * planning algorithm - * - * @return the earliest allowed starting time for the stage. - */ - long setEarliestStartTime(Plan plan, ReservationDefinition reservation, - int index, ReservationRequest currentReservationStage, - long stageDeadline); - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0eae1c63/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByDemand.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByDemand.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByDemand.java deleted file mode 100644 index 43d6584..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByDemand.java +++ /dev/null @@ -1,106 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; - -import java.util.ListIterator; - -import org.apache.hadoop.yarn.api.records.ReservationDefinition; -import org.apache.hadoop.yarn.api.records.ReservationRequest; -import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; - -/** - * Sets the earliest start time of a stage proportional to the job weight. The - * interval [jobArrival, stageDeadline) is divided as follows. First, each stage - * is guaranteed at least its requested duration. Then, the stage receives a - * fraction of the remaining time. The fraction is calculated as the ratio - * between the weight (total requested resources) of the stage and the total - * weight of all proceeding stages. - */ - -public class StageEarliestStartByDemand implements StageEarliestStart { - - private long step; - - @Override - public long setEarliestStartTime(Plan plan, - ReservationDefinition reservation, int index, ReservationRequest current, - long stageDeadline) { - - step = plan.getStep(); - - // If this is the first stage, don't bother with the computation. - if (index < 1) { - return reservation.getArrival(); - } - - // Get iterator - ListIterator<ReservationRequest> li = - reservation.getReservationRequests().getReservationResources() - .listIterator(index); - ReservationRequest rr; - - // Calculate the total weight & total duration - double totalWeight = calcWeight(current); - long totalDuration = getRoundedDuration(current, plan); - - while (li.hasPrevious()) { - rr = li.previous(); - totalWeight += calcWeight(rr); - totalDuration += getRoundedDuration(rr, plan); - } - - // Compute the weight of the current stage as compared to remaining ones - double ratio = calcWeight(current) / totalWeight; - - // Estimate an early start time, such that: - // 1. Every stage is guaranteed to receive at least its duration - // 2. The remainder of the window is divided between stages - // proportionally to its workload (total memory consumption) - long window = stageDeadline - reservation.getArrival(); - long windowRemainder = window - totalDuration; - long earlyStart = - (long) (stageDeadline - getRoundedDuration(current, plan) - - (windowRemainder * ratio)); - - // Realign if necessary (since we did some arithmetic) - earlyStart = stepRoundUp(earlyStart, step); - - // Return - return earlyStart; - - } - - // Weight = total memory consumption of stage - protected double calcWeight(ReservationRequest stage) { - return (stage.getDuration() * stage.getCapability().getMemorySize()) - * (stage.getNumContainers()); - } - - protected long getRoundedDuration(ReservationRequest stage, Plan plan) { - return stepRoundUp(stage.getDuration(), step); - } - - protected static long stepRoundDown(long t, long step) { - return (t / step) * step; - } - - protected static long stepRoundUp(long t, long step) { - return ((t + step - 1) / step) * step; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0eae1c63/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByJobArrival.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByJobArrival.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByJobArrival.java deleted file mode 100644 index 8347816..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByJobArrival.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; - -import org.apache.hadoop.yarn.api.records.ReservationDefinition; -import org.apache.hadoop.yarn.api.records.ReservationRequest; -import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; - -/** - * Sets the earliest start time of a stage as the job arrival time. - */ -public class StageEarliestStartByJobArrival implements StageEarliestStart { - - @Override - public long setEarliestStartTime(Plan plan, - ReservationDefinition reservation, int index, ReservationRequest current, - long stageDeadline) { - - return reservation.getArrival(); - - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0eae1c63/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageExecutionInterval.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageExecutionInterval.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageExecutionInterval.java new file mode 100644 index 0000000..8f7f5f7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageExecutionInterval.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval; + +/** + * An auxiliary class used to compute the time interval in which the stage can + * be allocated resources by {@link IterativePlanner}. + */ +public interface StageExecutionInterval { + /** + * Computes the earliest allowed starting time for a given stage. + * + * @param plan the Plan to which the reservation must be fitted + * @param reservation the job contract + * @param currentReservationStage the stage + * @param allocateLeft is the job allocated from left to right + * @param allocations Existing resource assignments for the job + * @return the time interval in which the stage can get resources. + */ + ReservationInterval computeExecutionInterval(Plan plan, + ReservationDefinition reservation, + ReservationRequest currentReservationStage, boolean allocateLeft, + RLESparseResourceAllocation allocations); + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0eae1c63/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageExecutionIntervalByDemand.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageExecutionIntervalByDemand.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageExecutionIntervalByDemand.java new file mode 100644 index 0000000..95f1d4b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageExecutionIntervalByDemand.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.IterativePlanner.StageProvider; + +/** + * An implementation of {@link StageExecutionInterval}, which sets the execution + * interval of the stage. For ANY and ALL jobs, the interval is + * [jobArrival,jobDeadline]. For ORDER jobs, the the maximal possible time + * interval is divided as follows: First, each stage is guaranteed at least its + * requested duration. Then, the stage receives a fraction of the remaining + * time. The fraction is calculated as the ratio between the weight (total + * requested resources) of the stage and the total weight of all remaining + * stages. + */ + +public class StageExecutionIntervalByDemand implements StageExecutionInterval { + + private long step; + + @Override + public ReservationInterval computeExecutionInterval(Plan plan, + ReservationDefinition reservation, + ReservationRequest currentReservationStage, boolean allocateLeft, + RLESparseResourceAllocation allocations) { + + // Use StageExecutionIntervalUnconstrained to get the maximal interval + ReservationInterval maxInterval = + (new StageExecutionIntervalUnconstrained()).computeExecutionInterval( + plan, reservation, currentReservationStage, allocateLeft, + allocations); + + ReservationRequestInterpreter jobType = + reservation.getReservationRequests().getInterpreter(); + + // For unconstrained jobs, such as ALL & ANY, we can use the unconstrained + // version + if ((jobType != ReservationRequestInterpreter.R_ORDER) + && (jobType != ReservationRequestInterpreter.R_ORDER_NO_GAP)) { + return maxInterval; + } + + // For ORDER and ORDER_NO_GAP, take a sub-interval of maxInterval + step = plan.getStep(); + + double totalWeight = 0.0; + long totalDuration = 0; + + // Iterate over the stages that haven't been allocated. + // For allocateLeft == True, we iterate in reverse order, starting from the + // last + // stage, until we reach the current stage. + // For allocateLeft == False, we do the opposite. + StageProvider stageProvider = new StageProvider(!allocateLeft, reservation); + + while (stageProvider.hasNext()) { + ReservationRequest rr = stageProvider.next(); + totalWeight += calcWeight(rr); + totalDuration += getRoundedDuration(rr, step); + + // Stop once we reach current + if (rr == currentReservationStage) { + break; + } + } + + // Compute the weight of the current stage as compared to remaining ones + double ratio = calcWeight(currentReservationStage) / totalWeight; + + // Estimate an early start time, such that: + // 1. Every stage is guaranteed to receive at least its duration + // 2. The remainder of the window is divided between stages + // proportionally to its workload (total memory consumption) + long maxIntervalArrival = maxInterval.getStartTime(); + long maxIntervalDeadline = maxInterval.getEndTime(); + long window = maxIntervalDeadline - maxIntervalArrival; + long windowRemainder = window - totalDuration; + + if (allocateLeft) { + long latestEnd = + (long) (maxIntervalArrival + + getRoundedDuration(currentReservationStage, step) + + (windowRemainder * ratio)); + + // Realign if necessary (since we did some arithmetic) + latestEnd = stepRoundDown(latestEnd, step); + + // Return new interval + return new ReservationInterval(maxIntervalArrival, latestEnd); + } else { + long earlyStart = + (long) (maxIntervalDeadline + - getRoundedDuration(currentReservationStage, step) + - (windowRemainder * ratio)); + + // Realign if necessary (since we did some arithmetic) + earlyStart = stepRoundUp(earlyStart, step); + + // Return new interval + return new ReservationInterval(earlyStart, maxIntervalDeadline); + } + } + + // Weight = total memory consumption of stage + protected double calcWeight(ReservationRequest stage) { + return (stage.getDuration() * stage.getCapability().getMemorySize()) + * (stage.getNumContainers()); + } + + protected long getRoundedDuration(ReservationRequest stage, Long s) { + return stepRoundUp(stage.getDuration(), s); + } + + protected static long stepRoundDown(long t, long s) { + return (t / s) * s; + } + + protected static long stepRoundUp(long t, long s) { + return ((t + s - 1) / s) * s; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0eae1c63/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageExecutionIntervalUnconstrained.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageExecutionIntervalUnconstrained.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageExecutionIntervalUnconstrained.java new file mode 100644 index 0000000..cccd9d8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageExecutionIntervalUnconstrained.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval; + +/** + * An implementation of {@link StageExecutionInterval} which gives each stage + * the maximal possible time interval, given the job constraints. Specifically, + * for ANY and ALL jobs, the interval would be [jobArrival, jobDeadline). For + * ORDER jobs, the stage cannot start before its predecessors (if allocateLeft + * == true) or cannot end before its successors (if allocateLeft == false) + */ +public class StageExecutionIntervalUnconstrained implements + StageExecutionInterval { + + @Override + public ReservationInterval computeExecutionInterval(Plan plan, + ReservationDefinition reservation, + ReservationRequest currentReservationStage, boolean allocateLeft, + RLESparseResourceAllocation allocations) { + + Long stageArrival = reservation.getArrival(); + Long stageDeadline = reservation.getDeadline(); + + ReservationRequestInterpreter jobType = + reservation.getReservationRequests().getInterpreter(); + + // Left to right + if (allocateLeft) { + // If ORDER job, change the stage arrival time + if ((jobType == ReservationRequestInterpreter.R_ORDER) + || (jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP)) { + Long allocationEndTime = allocations.getLatestNonNullTime(); + if (allocationEndTime != -1) { + stageArrival = allocationEndTime; + } + } + // Right to left + } else { + // If ORDER job, change the stage deadline + if ((jobType == ReservationRequestInterpreter.R_ORDER) + || (jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP)) { + Long allocationStartTime = allocations.getEarliestStartTime(); + if (allocationStartTime != -1) { + stageDeadline = allocationStartTime; + } + } + } + return new ReservationInterval(stageArrival, stageDeadline); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org