http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java new file mode 100644 index 0000000..9a0a0f0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; + +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; + +/** + * An abstract class that follows the general behavior of planning algorithms. + */ +public abstract class PlanningAlgorithm implements ReservationAgent { + + /** + * Performs the actual allocation for a ReservationDefinition within a Plan. + * + * @param reservationId the identifier of the reservation + * @param user the user who owns the reservation + * @param plan the Plan to which the reservation must be fitted + * @param contract encapsulates the resources required by the user for his + * session + * @param oldReservation the existing reservation (null if none) + * @return whether the allocateUser function was successful or not + * + * @throws PlanningException if the session cannot be fitted into the plan + * @throws ContractValidationException + */ + protected boolean allocateUser(ReservationId reservationId, String user, + Plan plan, ReservationDefinition contract, + ReservationAllocation oldReservation) throws PlanningException, + ContractValidationException { + + // Adjust the ResourceDefinition to account for system "imperfections" + // (e.g., scheduling delays for large containers). + ReservationDefinition adjustedContract = adjustContract(plan, contract); + + // Compute the job allocation + RLESparseResourceAllocation allocation = + computeJobAllocation(plan, reservationId, adjustedContract); + + // If no job allocation was found, fail + if (allocation == null) { + throw new PlanningException( + "The planning algorithm could not find a valid allocation" + + " for your request"); + } + + // Translate the allocation to a map (with zero paddings) + long step = plan.getStep(); + long jobArrival = stepRoundUp(adjustedContract.getArrival(), step); + long jobDeadline = stepRoundUp(adjustedContract.getDeadline(), step); + Map<ReservationInterval, Resource> mapAllocations = + allocationsToPaddedMap(allocation, jobArrival, jobDeadline); + + // Create the reservation + ReservationAllocation capReservation = + new InMemoryReservationAllocation(reservationId, // ID + adjustedContract, // Contract + user, // User name + plan.getQueueName(), // Queue name + findEarliestTime(mapAllocations.keySet()), // Earliest start time + findLatestTime(mapAllocations.keySet()), // Latest end time + mapAllocations, // Allocations + plan.getResourceCalculator(), // Resource calculator + plan.getMinimumAllocation()); // Minimum allocation + + // Add (or update) the reservation allocation + if (oldReservation != null) { + return plan.updateReservation(capReservation); + } else { + return plan.addReservation(capReservation); + } + + } + + private Map<ReservationInterval, Resource> + allocationsToPaddedMap(RLESparseResourceAllocation allocation, + long jobArrival, long jobDeadline) { + + // Allocate + Map<ReservationInterval, Resource> mapAllocations = + allocation.toIntervalMap(); + + // Zero allocation + Resource zeroResource = Resource.newInstance(0, 0); + + // Pad at the beginning + long earliestStart = findEarliestTime(mapAllocations.keySet()); + if (jobArrival < earliestStart) { + mapAllocations.put(new ReservationInterval(jobArrival, earliestStart), + zeroResource); + } + + // Pad at the beginning + long latestEnd = findLatestTime(mapAllocations.keySet()); + if (latestEnd < jobDeadline) { + mapAllocations.put(new ReservationInterval(latestEnd, jobDeadline), + zeroResource); + } + + return mapAllocations; + + } + + public abstract RLESparseResourceAllocation computeJobAllocation(Plan plan, + ReservationId reservationId, ReservationDefinition reservation) + throws PlanningException, ContractValidationException; + + @Override + public boolean createReservation(ReservationId reservationId, String user, + Plan plan, ReservationDefinition contract) throws PlanningException { + + // Allocate + return allocateUser(reservationId, user, plan, contract, null); + + } + + @Override + public boolean updateReservation(ReservationId reservationId, String user, + Plan plan, ReservationDefinition contract) throws PlanningException { + + // Get the old allocation + ReservationAllocation oldAlloc = plan.getReservationById(reservationId); + + // Allocate (ignores the old allocation) + return allocateUser(reservationId, user, plan, contract, oldAlloc); + + } + + @Override + public boolean deleteReservation(ReservationId reservationId, String user, + Plan plan) throws PlanningException { + + // Delete the existing reservation + return plan.deleteReservation(reservationId); + + } + + protected static long findEarliestTime(Set<ReservationInterval> sesInt) { + + long ret = Long.MAX_VALUE; + for (ReservationInterval s : sesInt) { + if (s.getStartTime() < ret) { + ret = s.getStartTime(); + } + } + return ret; + + } + + protected static long findLatestTime(Set<ReservationInterval> sesInt) { + + long ret = Long.MIN_VALUE; + for (ReservationInterval s : sesInt) { + if (s.getEndTime() > ret) { + ret = s.getEndTime(); + } + } + return ret; + + } + + protected static long stepRoundDown(long t, long step) { + return (t / step) * step; + } + + protected static long stepRoundUp(long t, long step) { + return ((t + step - 1) / step) * step; + } + + private ReservationDefinition adjustContract(Plan plan, + ReservationDefinition originalContract) { + + // Place here adjustment. For example using QueueMetrics we can track + // large container delays per YARN-YARN-1990 + + return originalContract; + + } + +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/ReservationAgent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/ReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/ReservationAgent.java new file mode 100644 index 0000000..bdea2f4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/ReservationAgent.java @@ -0,0 +1,73 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *******************************************************************************/ +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; + +/** + * An entity that seeks to acquire resources to satisfy an user's contract + */ +public interface ReservationAgent { + + /** + * Create a reservation for the user that abides by the specified contract + * + * @param reservationId the identifier of the reservation to be created. + * @param user the user who wants to create the reservation + * @param plan the Plan to which the reservation must be fitted + * @param contract encapsulates the resources the user requires for his + * session + * + * @return whether the create operation was successful or not + * @throws PlanningException if the session cannot be fitted into the plan + */ + public boolean createReservation(ReservationId reservationId, String user, + Plan plan, ReservationDefinition contract) throws PlanningException; + + /** + * Update a reservation for the user that abides by the specified contract + * + * @param reservationId the identifier of the reservation to be updated + * @param user the user who wants to create the session + * @param plan the Plan to which the reservation must be fitted + * @param contract encapsulates the resources the user requires for his + * reservation + * + * @return whether the update operation was successful or not + * @throws PlanningException if the reservation cannot be fitted into the plan + */ + public boolean updateReservation(ReservationId reservationId, String user, + Plan plan, ReservationDefinition contract) throws PlanningException; + + /** + * Delete an user reservation + * + * @param reservationId the identifier of the reservation to be deleted + * @param user the user who wants to create the reservation + * @param plan the Plan to which the session must be fitted + * + * @return whether the delete operation was successful or not + * @throws PlanningException if the reservation cannot be fitted into the plan + */ + public boolean deleteReservation(ReservationId reservationId, String user, + Plan plan) throws PlanningException; + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimpleCapacityReplanner.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimpleCapacityReplanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimpleCapacityReplanner.java new file mode 100644 index 0000000..7507783 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/SimpleCapacityReplanner.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; + +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.UTCClock; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +import com.google.common.annotations.VisibleForTesting; + +/** + * This (re)planner scan a period of time from now to a maximum time window (or + * the end of the last session, whichever comes first) checking the overall + * capacity is not violated. + * + * It greedily removes sessions in reversed order of acceptance (latest accepted + * is the first removed). + */ +public class SimpleCapacityReplanner implements Planner { + + private static final Log LOG = LogFactory + .getLog(SimpleCapacityReplanner.class); + + private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0); + + private final Clock clock; + + // this allows to control to time-span of this replanning + // far into the future time instants might be worth replanning for + // later on + private long lengthOfCheckZone; + + public SimpleCapacityReplanner() { + this(new UTCClock()); + } + + @VisibleForTesting + SimpleCapacityReplanner(Clock clock) { + this.clock = clock; + } + + @Override + public void init(String planQueueName, + ReservationSchedulerConfiguration conf) { + this.lengthOfCheckZone = conf.getEnforcementWindow(planQueueName); + } + + @Override + public void plan(Plan plan, List<ReservationDefinition> contracts) + throws PlanningException { + + if (contracts != null) { + throw new RuntimeException( + "SimpleCapacityReplanner cannot handle new reservation contracts"); + } + + ResourceCalculator resCalc = plan.getResourceCalculator(); + Resource totCap = plan.getTotalCapacity(); + long now = clock.getTime(); + + // loop on all moment in time from now to the end of the check Zone + // or the end of the planned sessions whichever comes first + for (long t = now; + (t < plan.getLastEndTime() && t < (now + lengthOfCheckZone)); + t += plan.getStep()) { + Resource excessCap = + Resources.subtract(plan.getTotalCommittedResources(t), totCap); + // if we are violating + if (Resources.greaterThan(resCalc, totCap, excessCap, ZERO_RESOURCE)) { + // sorted on reverse order of acceptance, so newest reservations first + Set<ReservationAllocation> curReservations = + new TreeSet<ReservationAllocation>(plan.getReservationsAtTime(t)); + for (Iterator<ReservationAllocation> resIter = + curReservations.iterator(); resIter.hasNext() + && Resources.greaterThan(resCalc, totCap, excessCap, + ZERO_RESOURCE);) { + ReservationAllocation reservation = resIter.next(); + plan.deleteReservation(reservation.getReservationId()); + excessCap = + Resources.subtract(excessCap, reservation.getResourcesAtTime(t)); + LOG.info("Removing reservation " + reservation.getReservationId() + + " to repair physical-resource constraints in the plan: " + + plan.getQueueName()); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java new file mode 100644 index 0000000..9df6b74 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocator.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; + +import java.util.Map; + +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval; + +/** + * Interface for allocating a single stage in IterativePlanner. + */ +public interface StageAllocator { + + /** + * Computes the allocation of a stage inside a defined time interval. + * + * @param plan the Plan to which the reservation must be fitted + * @param planLoads a 'dirty' read of the plan loads at each time + * @param planModifications the allocations performed by the planning + * algorithm which are not yet reflected by plan + * @param rr the stage + * @param stageEarliestStart the arrival time (earliest starting time) set for + * the stage by the two phase planning algorithm + * @param stageDeadline the deadline of the stage set by the two phase + * planning algorithm + * + * @return The computed allocation (or null if the stage could not be + * allocated) + */ + Map<ReservationInterval, Resource> computeStageAllocation(Plan plan, + Map<Long, Resource> planLoads, + RLESparseResourceAllocation planModifications, ReservationRequest rr, + long stageEarliestStart, long stageDeadline); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java new file mode 100644 index 0000000..773fbdf --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorGreedy.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval; +import org.apache.hadoop.yarn.util.resource.Resources; + +/** + * Computes the stage allocation according to the greedy allocation rule. The + * greedy rule repeatedly allocates requested containers at the rightmost + * (latest) free interval. + */ + +public class StageAllocatorGreedy implements StageAllocator { + + @Override + public Map<ReservationInterval, Resource> computeStageAllocation(Plan plan, + Map<Long, Resource> planLoads, + RLESparseResourceAllocation planModifications, ReservationRequest rr, + long stageEarliestStart, long stageDeadline) { + + Resource totalCapacity = plan.getTotalCapacity(); + + Map<ReservationInterval, Resource> allocationRequests = + new HashMap<ReservationInterval, Resource>(); + + // compute the gang as a resource and get the duration + Resource gang = Resources.multiply(rr.getCapability(), rr.getConcurrency()); + long dur = rr.getDuration(); + long step = plan.getStep(); + + // ceil the duration to the next multiple of the plan step + if (dur % step != 0) { + dur += (step - (dur % step)); + } + + // we know for sure that this division has no remainder (part of contract + // with user, validate before + int gangsToPlace = rr.getNumContainers() / rr.getConcurrency(); + + int maxGang = 0; + + // loop trying to place until we are done, or we are considering + // an invalid range of times + while (gangsToPlace > 0 && stageDeadline - dur >= stageEarliestStart) { + + // as we run along we remember how many gangs we can fit, and what + // was the most constraining moment in time (we will restart just + // after that to place the next batch) + maxGang = gangsToPlace; + long minPoint = stageDeadline; + int curMaxGang = maxGang; + + // start placing at deadline (excluded due to [,) interval semantics and + // move backward + for (long t = stageDeadline - plan.getStep(); t >= stageDeadline - dur + && maxGang > 0; t = t - plan.getStep()) { + + // compute net available resources + Resource netAvailableRes = Resources.clone(totalCapacity); + // Resources.addTo(netAvailableRes, oldResCap); + Resources.subtractFrom(netAvailableRes, + plan.getTotalCommittedResources(t)); + Resources.subtractFrom(netAvailableRes, + planModifications.getCapacityAtTime(t)); + + // compute maximum number of gangs we could fit + curMaxGang = + (int) Math.floor(Resources.divide(plan.getResourceCalculator(), + totalCapacity, netAvailableRes, gang)); + + // pick the minimum between available resources in this instant, and how + // many gangs we have to place + curMaxGang = Math.min(gangsToPlace, curMaxGang); + + // compare with previous max, and set it. also remember *where* we found + // the minimum (useful for next attempts) + if (curMaxGang <= maxGang) { + maxGang = curMaxGang; + minPoint = t; + } + } + + // if we were able to place any gang, record this, and decrement + // gangsToPlace + if (maxGang > 0) { + gangsToPlace -= maxGang; + + ReservationInterval reservationInt = + new ReservationInterval(stageDeadline - dur, stageDeadline); + Resource reservationRes = + Resources.multiply(rr.getCapability(), rr.getConcurrency() + * maxGang); + // remember occupied space (plan is read-only till we find a plausible + // allocation for the entire request). This is needed since we might be + // placing other ReservationRequest within the same + // ReservationDefinition, + // and we must avoid double-counting the available resources + planModifications.addInterval(reservationInt, reservationRes); + allocationRequests.put(reservationInt, reservationRes); + + } + + // reset our new starting point (curDeadline) to the most constraining + // point so far, we will look "left" of that to find more places where + // to schedule gangs (for sure nothing on the "right" of this point can + // fit a full gang. + stageDeadline = minPoint; + } + + // if no gangs are left to place we succeed and return the allocation + if (gangsToPlace == 0) { + return allocationRequests; + } else { + // If we are here is becasue we did not manage to satisfy this request. + // So we need to remove unwanted side-effect from tempAssigned (needed + // for ANY). + for (Map.Entry<ReservationInterval, Resource> tempAllocation + : allocationRequests.entrySet()) { + planModifications.removeInterval(tempAllocation.getKey(), + tempAllocation.getValue()); + } + // and return null to signal failure in this allocation + return null; + } + + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java new file mode 100644 index 0000000..4b5763d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageAllocatorLowCostAligned.java @@ -0,0 +1,360 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; + +import java.util.Comparator; +import java.util.Map; +import java.util.TreeSet; + +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +/** + * A stage allocator that iteratively allocates containers in the + * {@link DurationInterval} with lowest overall cost. The algorithm only + * considers intervals of the form: [stageDeadline - (n+1)*duration, + * stageDeadline - n*duration) for an integer n. This guarantees that the + * allocations are aligned (as opposed to overlapping duration intervals). + * + * The smoothnessFactor parameter controls the number of containers that are + * simultaneously allocated in each iteration of the algorithm. + */ + +public class StageAllocatorLowCostAligned implements StageAllocator { + + // Smoothness factor + private int smoothnessFactor = 10; + + // Constructor + public StageAllocatorLowCostAligned() { + } + + // Constructor + public StageAllocatorLowCostAligned(int smoothnessFactor) { + this.smoothnessFactor = smoothnessFactor; + } + + // computeJobAllocation() + @Override + public Map<ReservationInterval, Resource> computeStageAllocation( + Plan plan, Map<Long, Resource> planLoads, + RLESparseResourceAllocation planModifications, ReservationRequest rr, + long stageEarliestStart, long stageDeadline) { + + // Initialize + ResourceCalculator resCalc = plan.getResourceCalculator(); + Resource capacity = plan.getTotalCapacity(); + long step = plan.getStep(); + + // Create allocationRequestsearlies + RLESparseResourceAllocation allocationRequests = + new RLESparseResourceAllocation(plan.getResourceCalculator(), + plan.getMinimumAllocation()); + + // Initialize parameters + long duration = stepRoundUp(rr.getDuration(), step); + int windowSizeInDurations = + (int) ((stageDeadline - stageEarliestStart) / duration); + int totalGangs = rr.getNumContainers() / rr.getConcurrency(); + int numContainersPerGang = rr.getConcurrency(); + Resource gang = + Resources.multiply(rr.getCapability(), numContainersPerGang); + + // Set maxGangsPerUnit + int maxGangsPerUnit = + (int) Math.max( + Math.floor(((double) totalGangs) / windowSizeInDurations), 1); + maxGangsPerUnit = Math.max(maxGangsPerUnit / smoothnessFactor, 1); + + // If window size is too small, return null + if (windowSizeInDurations <= 0) { + return null; + } + + // Initialize tree sorted by costs + TreeSet<DurationInterval> durationIntervalsSortedByCost = + new TreeSet<DurationInterval>(new Comparator<DurationInterval>() { + @Override + public int compare(DurationInterval val1, DurationInterval val2) { + + int cmp = Double.compare(val1.getTotalCost(), val2.getTotalCost()); + if (cmp != 0) { + return cmp; + } + + return (-1) * Long.compare(val1.getEndTime(), val2.getEndTime()); + } + }); + + // Add durationIntervals that end at (endTime - n*duration) for some n. + for (long intervalEnd = stageDeadline; intervalEnd >= stageEarliestStart + + duration; intervalEnd -= duration) { + + long intervalStart = intervalEnd - duration; + + // Get duration interval [intervalStart,intervalEnd) + DurationInterval durationInterval = + getDurationInterval(intervalStart, intervalEnd, planLoads, + planModifications, capacity, resCalc, step); + + // If the interval can fit a gang, add it to the tree + if (durationInterval.canAllocate(gang, capacity, resCalc)) { + durationIntervalsSortedByCost.add(durationInterval); + } + } + + // Allocate + int remainingGangs = totalGangs; + while (remainingGangs > 0) { + + // If no durationInterval can fit a gang, break and return null + if (durationIntervalsSortedByCost.isEmpty()) { + break; + } + + // Get best duration interval + DurationInterval bestDurationInterval = + durationIntervalsSortedByCost.first(); + int numGangsToAllocate = Math.min(maxGangsPerUnit, remainingGangs); + + // Add it + remainingGangs -= numGangsToAllocate; + + ReservationInterval reservationInt = + new ReservationInterval(bestDurationInterval.getStartTime(), + bestDurationInterval.getEndTime()); + + Resource reservationRes = + Resources.multiply(rr.getCapability(), rr.getConcurrency() + * numGangsToAllocate); + + planModifications.addInterval(reservationInt, reservationRes); + allocationRequests.addInterval(reservationInt, reservationRes); + + // Remove from tree + durationIntervalsSortedByCost.remove(bestDurationInterval); + + // Get updated interval + DurationInterval updatedDurationInterval = + getDurationInterval(bestDurationInterval.getStartTime(), + bestDurationInterval.getStartTime() + duration, planLoads, + planModifications, capacity, resCalc, step); + + // Add to tree, if possible + if (updatedDurationInterval.canAllocate(gang, capacity, resCalc)) { + durationIntervalsSortedByCost.add(updatedDurationInterval); + } + + } + + // Get the final allocation + Map<ReservationInterval, Resource> allocations = + allocationRequests.toIntervalMap(); + + // If no gangs are left to place we succeed and return the allocation + if (remainingGangs <= 0) { + return allocations; + } else { + + // If we are here is because we did not manage to satisfy this request. + // We remove unwanted side-effect from planModifications (needed for ANY). + for (Map.Entry<ReservationInterval, Resource> tempAllocation + : allocations.entrySet()) { + + planModifications.removeInterval(tempAllocation.getKey(), + tempAllocation.getValue()); + + } + // Return null to signal failure in this allocation + return null; + + } + + } + + protected DurationInterval getDurationInterval(long startTime, long endTime, + Map<Long, Resource> planLoads, + RLESparseResourceAllocation planModifications, Resource capacity, + ResourceCalculator resCalc, long step) { + + // Initialize the dominant loads structure + Resource dominantResources = Resource.newInstance(0, 0); + + // Calculate totalCost and maxLoad + double totalCost = 0.0; + for (long t = startTime; t < endTime; t += step) { + + // Get the load + Resource load = getLoadAtTime(t, planLoads, planModifications); + + // Increase the total cost + totalCost += calcCostOfLoad(load, capacity, resCalc); + + // Update the dominant resources + dominantResources = Resources.componentwiseMax(dominantResources, load); + + } + + // Return the corresponding durationInterval + return new DurationInterval(startTime, endTime, totalCost, + dominantResources); + + } + + protected double calcCostOfInterval(long startTime, long endTime, + Map<Long, Resource> planLoads, + RLESparseResourceAllocation planModifications, Resource capacity, + ResourceCalculator resCalc, long step) { + + // Sum costs in the interval [startTime,endTime) + double totalCost = 0.0; + for (long t = startTime; t < endTime; t += step) { + totalCost += calcCostOfTimeSlot(t, planLoads, planModifications, capacity, + resCalc); + } + + // Return sum + return totalCost; + + } + + protected double calcCostOfTimeSlot(long t, Map<Long, Resource> planLoads, + RLESparseResourceAllocation planModifications, Resource capacity, + ResourceCalculator resCalc) { + + // Get the current load at time t + Resource load = getLoadAtTime(t, planLoads, planModifications); + + // Return cost + return calcCostOfLoad(load, capacity, resCalc); + + } + + protected Resource getLoadAtTime(long t, Map<Long, Resource> planLoads, + RLESparseResourceAllocation planModifications) { + + Resource planLoad = planLoads.get(t); + planLoad = (planLoad == null) ? Resource.newInstance(0, 0) : planLoad; + + return Resources.add(planLoad, planModifications.getCapacityAtTime(t)); + + } + + protected double calcCostOfLoad(Resource load, Resource capacity, + ResourceCalculator resCalc) { + + return resCalc.ratio(load, capacity); + + } + + protected static long stepRoundDown(long t, long step) { + return (t / step) * step; + } + + protected static long stepRoundUp(long t, long step) { + return ((t + step - 1) / step) * step; + } + + /** + * An inner class that represents an interval, typically of length duration. + * The class holds the total cost of the interval and the maximal load inside + * the interval in each dimension (both calculated externally). + */ + protected static class DurationInterval { + + private long startTime; + private long endTime; + private double cost; + private Resource maxLoad; + + // Constructor + public DurationInterval(long startTime, long endTime, double cost, + Resource maxLoad) { + this.startTime = startTime; + this.endTime = endTime; + this.cost = cost; + this.maxLoad = maxLoad; + } + + // canAllocate() - boolean function, returns whether requestedResources + // can be allocated during the durationInterval without + // violating capacity constraints + public boolean canAllocate(Resource requestedResources, Resource capacity, + ResourceCalculator resCalc) { + + Resource updatedMaxLoad = Resources.add(maxLoad, requestedResources); + return (resCalc.compare(capacity, updatedMaxLoad, capacity) <= 0); + + } + + // numCanFit() - returns the maximal number of requestedResources can be + // allocated during the durationInterval without violating + // capacity constraints + public int numCanFit(Resource requestedResources, Resource capacity, + ResourceCalculator resCalc) { + + // Represents the largest resource demand that can be satisfied throughout + // the entire DurationInterval (i.e., during [startTime,endTime)) + Resource availableResources = Resources.subtract(capacity, maxLoad); + + // Maximal number of requestedResources that fit inside the interval + return (int) Math.floor(Resources.divide(resCalc, capacity, + availableResources, requestedResources)); + + } + + public long getStartTime() { + return this.startTime; + } + + public void setStartTime(long value) { + this.startTime = value; + } + + public long getEndTime() { + return this.endTime; + } + + public void setEndTime(long value) { + this.endTime = value; + } + + public Resource getMaxLoad() { + return this.maxLoad; + } + + public void setMaxLoad(Resource value) { + this.maxLoad = value; + } + + public double getTotalCost() { + return this.cost; + } + + public void setTotalCost(double value) { + this.cost = value; + } + + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStart.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStart.java new file mode 100644 index 0000000..547616a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStart.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; + +/** + * Interface for setting the earliest start time of a stage in IterativePlanner. + */ +public interface StageEarliestStart { + + /** + * Computes the earliest allowed starting time for a given stage. + * + * @param plan the Plan to which the reservation must be fitted + * @param reservation the job contract + * @param index the index of the stage in the job contract + * @param currentReservationStage the stage + * @param stageDeadline the deadline of the stage set by the two phase + * planning algorithm + * + * @return the earliest allowed starting time for the stage. + */ + long setEarliestStartTime(Plan plan, ReservationDefinition reservation, + int index, ReservationRequest currentReservationStage, + long stageDeadline); + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByDemand.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByDemand.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByDemand.java new file mode 100644 index 0000000..5a46a4e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByDemand.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; + +import java.util.ListIterator; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; + +/** + * Sets the earliest start time of a stage proportional to the job weight. The + * interval [jobArrival, stageDeadline) is divided as follows. First, each stage + * is guaranteed at least its requested duration. Then, the stage receives a + * fraction of the remaining time. The fraction is calculated as the ratio + * between the weight (total requested resources) of the stage and the total + * weight of all proceeding stages. + */ + +public class StageEarliestStartByDemand implements StageEarliestStart { + + private long step; + + @Override + public long setEarliestStartTime(Plan plan, + ReservationDefinition reservation, int index, ReservationRequest current, + long stageDeadline) { + + step = plan.getStep(); + + // If this is the first stage, don't bother with the computation. + if (index < 1) { + return reservation.getArrival(); + } + + // Get iterator + ListIterator<ReservationRequest> li = + reservation.getReservationRequests().getReservationResources() + .listIterator(index); + ReservationRequest rr; + + // Calculate the total weight & total duration + double totalWeight = calcWeight(current); + long totalDuration = getRoundedDuration(current, plan); + + while (li.hasPrevious()) { + rr = li.previous(); + totalWeight += calcWeight(rr); + totalDuration += getRoundedDuration(rr, plan); + } + + // Compute the weight of the current stage as compared to remaining ones + double ratio = calcWeight(current) / totalWeight; + + // Estimate an early start time, such that: + // 1. Every stage is guaranteed to receive at least its duration + // 2. The remainder of the window is divided between stages + // proportionally to its workload (total memory consumption) + long window = stageDeadline - reservation.getArrival(); + long windowRemainder = window - totalDuration; + long earlyStart = + (long) (stageDeadline - getRoundedDuration(current, plan) + - (windowRemainder * ratio)); + + // Realign if necessary (since we did some arithmetic) + earlyStart = stepRoundUp(earlyStart, step); + + // Return + return earlyStart; + + } + + // Weight = total memory consumption of stage + protected double calcWeight(ReservationRequest stage) { + return (stage.getDuration() * stage.getCapability().getMemory()) + * (stage.getNumContainers()); + } + + protected long getRoundedDuration(ReservationRequest stage, Plan plan) { + return stepRoundUp(stage.getDuration(), step); + } + + protected static long stepRoundDown(long t, long step) { + return (t / step) * step; + } + + protected static long stepRoundUp(long t, long step) { + return ((t + step - 1) / step) * step; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByJobArrival.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByJobArrival.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByJobArrival.java new file mode 100644 index 0000000..8347816 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/StageEarliestStartByJobArrival.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; + +/** + * Sets the earliest start time of a stage as the job arrival time. + */ +public class StageEarliestStartByJobArrival implements StageEarliestStart { + + @Override + public long setEarliestStartTime(Plan plan, + ReservationDefinition reservation, int index, ReservationRequest current, + long stageDeadline) { + + return reservation.getArrival(); + + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TryManyReservationAgents.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TryManyReservationAgents.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TryManyReservationAgents.java new file mode 100644 index 0000000..1d37ce5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TryManyReservationAgents.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; + +import java.util.LinkedList; +import java.util.List; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; + +/** + * A planning algorithm that invokes several other planning algorithms according + * to a given order. If one of the planners succeeds, the allocation it + * generates is returned. + */ +public class TryManyReservationAgents implements ReservationAgent { + + // Planning algorithms + private final List<ReservationAgent> algs; + + // Constructor + public TryManyReservationAgents(List<ReservationAgent> algs) { + this.algs = new LinkedList<ReservationAgent>(algs); + } + + @Override + public boolean createReservation(ReservationId reservationId, String user, + Plan plan, ReservationDefinition contract) throws PlanningException { + + // Save the planning exception + PlanningException planningException = null; + + // Try all of the algorithms, in order + for (ReservationAgent alg : algs) { + + try { + if (alg.createReservation(reservationId, user, plan, contract)) { + return true; + } + } catch (PlanningException e) { + planningException = e; + } + + } + + // If all of the algorithms failed and one of the algorithms threw an + // exception, throw the last planning exception + if (planningException != null) { + throw planningException; + } + + // If all of the algorithms failed, return false + return false; + + } + + @Override + public boolean updateReservation(ReservationId reservationId, String user, + Plan plan, ReservationDefinition contract) throws PlanningException { + + // Save the planning exception + PlanningException planningException = null; + + // Try all of the algorithms, in order + for (ReservationAgent alg : algs) { + + try { + if (alg.updateReservation(reservationId, user, plan, contract)) { + return true; + } + } catch (PlanningException e) { + planningException = e; + } + + } + + // If all of the algorithms failed and one of the algorithms threw an + // exception, throw the last planning exception + if (planningException != null) { + throw planningException; + } + + // If all of the algorithms failed, return false + return false; + + } + + @Override + public boolean deleteReservation(ReservationId reservationId, String user, + Plan plan) throws PlanningException { + + return plan.deleteReservation(reservationId); + + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java index be1d69a..adb9dcf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java @@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.AlignedPlannerWithGreedy; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; @@ -89,7 +90,7 @@ public class ReservationSystemTestUtil { Assert.assertEquals(planQName, plan.getQueueName()); Assert.assertEquals(8192, plan.getTotalCapacity().getMemory()); Assert.assertTrue( - plan.getReservationAgent() instanceof GreedyReservationAgent); + plan.getReservationAgent() instanceof AlignedPlannerWithGreedy); Assert.assertTrue( plan.getSharingPolicy() instanceof CapacityOverTimePolicy); } @@ -102,7 +103,7 @@ public class ReservationSystemTestUtil { Assert.assertEquals(newQ, newPlan.getQueueName()); Assert.assertEquals(1024, newPlan.getTotalCapacity().getMemory()); Assert - .assertTrue(newPlan.getReservationAgent() instanceof GreedyReservationAgent); + .assertTrue(newPlan.getReservationAgent() instanceof AlignedPlannerWithGreedy); Assert .assertTrue(newPlan.getSharingPolicy() instanceof CapacityOverTimePolicy); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java index 19f876d..f608c3b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java @@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException; - +import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java index b8663f6..15f9a89 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java index f294eaf..4b685b2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java @@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; http://git-wip-us.apache.org/repos/asf/hadoop/blob/156f24ea/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java index e9a4f50..43316f7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairSchedulerPlanFollower.java @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;