http://git-wip-us.apache.org/repos/asf/hadoop/blob/d894f910/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java index 3b539fa..420a1c9 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java @@ -18,16 +18,17 @@ package org.apache.hadoop.yarn.sls.scheduler; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair - .FSAppAttempt; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; +import org.apache.hadoop.yarn.sls.SLSRunner; import com.codahale.metrics.Gauge; -import org.apache.hadoop.yarn.sls.SLSRunner; @Private @Unstable @@ -37,114 +38,131 @@ public class FairSchedulerMetrics extends SchedulerMetrics { private int totalVCores = Integer.MAX_VALUE; private boolean maxReset = false; + @VisibleForTesting + public enum Metric { + DEMAND("demand"), + USAGE("usage"), + MINSHARE("minshare"), + MAXSHARE("maxshare"), + FAIRSHARE("fairshare"); + + private String value; + + Metric(String value) { + this.value = value; + } + + @VisibleForTesting + public String getValue() { + return value; + } + } + public FairSchedulerMetrics() { super(); - appTrackedMetrics.add("demand.memory"); - appTrackedMetrics.add("demand.vcores"); - appTrackedMetrics.add("usage.memory"); - appTrackedMetrics.add("usage.vcores"); - appTrackedMetrics.add("minshare.memory"); - appTrackedMetrics.add("minshare.vcores"); - appTrackedMetrics.add("maxshare.memory"); - appTrackedMetrics.add("maxshare.vcores"); - appTrackedMetrics.add("fairshare.memory"); - appTrackedMetrics.add("fairshare.vcores"); - queueTrackedMetrics.add("demand.memory"); - queueTrackedMetrics.add("demand.vcores"); - queueTrackedMetrics.add("usage.memory"); - queueTrackedMetrics.add("usage.vcores"); - queueTrackedMetrics.add("minshare.memory"); - queueTrackedMetrics.add("minshare.vcores"); - queueTrackedMetrics.add("maxshare.memory"); - queueTrackedMetrics.add("maxshare.vcores"); - queueTrackedMetrics.add("fairshare.memory"); - queueTrackedMetrics.add("fairshare.vcores"); + + for (Metric metric: Metric.values()) { + appTrackedMetrics.add(metric.value + ".memory"); + appTrackedMetrics.add(metric.value + ".vcores"); + queueTrackedMetrics.add(metric.value + ".memory"); + queueTrackedMetrics.add(metric.value + ".vcores"); + } } - - @Override - public void trackApp(ApplicationAttemptId appAttemptId, String oldAppId) { - super.trackApp(appAttemptId, oldAppId); - FairScheduler fair = (FairScheduler) scheduler; - final FSAppAttempt app = fair.getSchedulerApp(appAttemptId); - metrics.register("variable.app." + oldAppId + ".demand.memory", - new Gauge<Long>() { - @Override - public Long getValue() { - return app.getDemand().getMemorySize(); - } - } - ); - metrics.register("variable.app." + oldAppId + ".demand.vcores", - new Gauge<Integer>() { - @Override - public Integer getValue() { - return app.getDemand().getVirtualCores(); - } - } - ); - metrics.register("variable.app." + oldAppId + ".usage.memory", - new Gauge<Long>() { - @Override - public Long getValue() { - return app.getResourceUsage().getMemorySize(); - } - } - ); - metrics.register("variable.app." + oldAppId + ".usage.vcores", - new Gauge<Integer>() { - @Override - public Integer getValue() { - return app.getResourceUsage().getVirtualCores(); - } - } - ); - metrics.register("variable.app." + oldAppId + ".minshare.memory", - new Gauge<Long>() { - @Override - public Long getValue() { - return app.getMinShare().getMemorySize(); - } - } - ); - metrics.register("variable.app." + oldAppId + ".minshare.vcores", - new Gauge<Long>() { - @Override - public Long getValue() { - return app.getMinShare().getMemorySize(); - } - } - ); - metrics.register("variable.app." + oldAppId + ".maxshare.memory", - new Gauge<Long>() { - @Override - public Long getValue() { - return Math.min(app.getMaxShare().getMemorySize(), totalMemoryMB); + + private long getMemorySize(Schedulable schedulable, Metric metric) { + if (schedulable != null) { + switch (metric) { + case DEMAND: + return schedulable.getDemand().getMemorySize(); + case USAGE: + return schedulable.getResourceUsage().getMemorySize(); + case MINSHARE: + return schedulable.getMinShare().getMemorySize(); + case MAXSHARE: + return schedulable.getMaxShare().getMemorySize(); + case FAIRSHARE: + return schedulable.getFairShare().getMemorySize(); + default: + return 0L; + } + } + + return 0L; + } + + private int getVirtualCores(Schedulable schedulable, Metric metric) { + if (schedulable != null) { + switch (metric) { + case DEMAND: + return schedulable.getDemand().getVirtualCores(); + case USAGE: + return schedulable.getResourceUsage().getVirtualCores(); + case MINSHARE: + return schedulable.getMinShare().getVirtualCores(); + case MAXSHARE: + return schedulable.getMaxShare().getVirtualCores(); + case FAIRSHARE: + return schedulable.getFairShare().getVirtualCores(); + default: + return 0; + } + } + + return 0; + } + + private void registerAppMetrics(final ApplicationId appId, String oldAppId, + final Metric metric) { + metrics.register( + "variable.app." + oldAppId + "." + metric.value + ".memory", + new Gauge<Long>() { + @Override + public Long getValue() { + return getMemorySize((FSAppAttempt)getSchedulerAppAttempt(appId), + metric); + } } - } ); - metrics.register("variable.app." + oldAppId + ".maxshare.vcores", - new Gauge<Integer>() { - @Override - public Integer getValue() { - return Math.min(app.getMaxShare().getVirtualCores(), totalVCores); + + metrics.register( + "variable.app." + oldAppId + "." + metric.value + ".vcores", + new Gauge<Integer>() { + @Override + public Integer getValue() { + return getVirtualCores((FSAppAttempt)getSchedulerAppAttempt(appId), + metric); + } } - } ); - metrics.register("variable.app." + oldAppId + ".fairshare.memory", - new Gauge<Integer>() { - @Override - public Integer getValue() { - return app.getFairShare().getVirtualCores(); + } + + @Override + public void trackApp(ApplicationId appId, String oldAppId) { + super.trackApp(appId, oldAppId); + + for (Metric metric: Metric.values()) { + registerAppMetrics(appId, oldAppId, metric); + } + } + + private void registerQueueMetrics(final FSQueue queue, final Metric metric) { + metrics.register( + "variable.queue." + queue.getName() + "." + metric.value + ".memory", + new Gauge<Long>() { + @Override + public Long getValue() { + return getMemorySize(queue, metric); + } } - } ); - metrics.register("variable.app." + oldAppId + ".fairshare.vcores", - new Gauge<Integer>() { - @Override - public Integer getValue() { - return app.getFairShare().getVirtualCores(); + metrics.register( + "variable.queue." + queue.getName() + "." + metric.value + ".vcores", + new Gauge<Integer>() { + @Override + public Integer getValue() { + return getVirtualCores(queue, metric); + } } - } ); } @@ -153,68 +171,25 @@ public class FairSchedulerMetrics extends SchedulerMetrics { trackedQueues.add(queueName); FairScheduler fair = (FairScheduler) scheduler; final FSQueue queue = fair.getQueueManager().getQueue(queueName); - metrics.register("variable.queue." + queueName + ".demand.memory", - new Gauge<Long>() { - @Override - public Long getValue() { - return queue.getDemand().getMemorySize(); - } - } - ); - metrics.register("variable.queue." + queueName + ".demand.vcores", - new Gauge<Integer>() { - @Override - public Integer getValue() { - return queue.getDemand().getVirtualCores(); - } - } - ); - metrics.register("variable.queue." + queueName + ".usage.memory", - new Gauge<Long>() { - @Override - public Long getValue() { - return queue.getResourceUsage().getMemorySize(); - } - } - ); - metrics.register("variable.queue." + queueName + ".usage.vcores", - new Gauge<Integer>() { - @Override - public Integer getValue() { - return queue.getResourceUsage().getVirtualCores(); - } - } - ); - metrics.register("variable.queue." + queueName + ".minshare.memory", - new Gauge<Long>() { - @Override - public Long getValue() { - return queue.getMinShare().getMemorySize(); - } - } - ); - metrics.register("variable.queue." + queueName + ".minshare.vcores", - new Gauge<Integer>() { - @Override - public Integer getValue() { - return queue.getMinShare().getVirtualCores(); - } - } - ); + registerQueueMetrics(queue, Metric.DEMAND); + registerQueueMetrics(queue, Metric.USAGE); + registerQueueMetrics(queue, Metric.MINSHARE); + registerQueueMetrics(queue, Metric.FAIRSHARE); + metrics.register("variable.queue." + queueName + ".maxshare.memory", new Gauge<Long>() { @Override public Long getValue() { - if (! maxReset && - SLSRunner.simulateInfoMap.containsKey("Number of nodes") && - SLSRunner.simulateInfoMap.containsKey("Node memory (MB)") && - SLSRunner.simulateInfoMap.containsKey("Node VCores")) { - int numNMs = Integer.parseInt( - SLSRunner.simulateInfoMap.get("Number of nodes").toString()); - int numMemoryMB = Integer.parseInt( - SLSRunner.simulateInfoMap.get("Node memory (MB)").toString()); - int numVCores = Integer.parseInt( - SLSRunner.simulateInfoMap.get("Node VCores").toString()); + if (! maxReset + && SLSRunner.getSimulateInfoMap().containsKey("Number of nodes") + && SLSRunner.getSimulateInfoMap().containsKey("Node memory (MB)") + && SLSRunner.getSimulateInfoMap().containsKey("Node VCores")) { + int numNMs = Integer.parseInt(SLSRunner.getSimulateInfoMap() + .get("Number of nodes").toString()); + int numMemoryMB = Integer.parseInt(SLSRunner.getSimulateInfoMap() + .get("Node memory (MB)").toString()); + int numVCores = Integer.parseInt(SLSRunner.getSimulateInfoMap() + .get("Node VCores").toString()); totalMemoryMB = numNMs * numMemoryMB; totalVCores = numNMs * numVCores; @@ -233,36 +208,17 @@ public class FairSchedulerMetrics extends SchedulerMetrics { } } ); - metrics.register("variable.queue." + queueName + ".fairshare.memory", - new Gauge<Long>() { - @Override - public Long getValue() { - return queue.getFairShare().getMemorySize(); - } - } - ); - metrics.register("variable.queue." + queueName + ".fairshare.vcores", - new Gauge<Integer>() { - @Override - public Integer getValue() { - return queue.getFairShare().getVirtualCores(); - } - } - ); } @Override public void untrackQueue(String queueName) { trackedQueues.remove(queueName); - metrics.remove("variable.queue." + queueName + ".demand.memory"); - metrics.remove("variable.queue." + queueName + ".demand.vcores"); - metrics.remove("variable.queue." + queueName + ".usage.memory"); - metrics.remove("variable.queue." + queueName + ".usage.vcores"); - metrics.remove("variable.queue." + queueName + ".minshare.memory"); - metrics.remove("variable.queue." + queueName + ".minshare.vcores"); - metrics.remove("variable.queue." + queueName + ".maxshare.memory"); - metrics.remove("variable.queue." + queueName + ".maxshare.vcores"); - metrics.remove("variable.queue." + queueName + ".fairshare.memory"); - metrics.remove("variable.queue." + queueName + ".fairshare.vcores"); + + for (Metric metric: Metric.values()) { + metrics.remove("variable.queue." + queueName + "." + + metric.value + ".memory"); + metrics.remove("variable.queue." + queueName + "." + + metric.value + ".vcores"); + } } }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d894f910/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java deleted file mode 100644 index 1ba6acc..0000000 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java +++ /dev/null @@ -1,973 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.sls.scheduler; - -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Set; -import java.util.SortedMap; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hadoop.util.ShutdownHookManager; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerExitStatus; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.QueueACL; -import org.apache.hadoop.yarn.api.records.QueueInfo; -import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; -import org.apache.hadoop.yarn.sls.SLSRunner; -import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; -import org.apache.hadoop.yarn.sls.web.SLSWebApp; -import org.apache.hadoop.yarn.util.resource.ResourceCalculator; -import org.apache.hadoop.yarn.util.resource.Resources; - -import com.codahale.metrics.Counter; -import com.codahale.metrics.CsvReporter; -import com.codahale.metrics.Gauge; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.SlidingWindowReservoir; -import com.codahale.metrics.Timer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@Private -@Unstable -public class ResourceSchedulerWrapper - extends AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode> - implements SchedulerWrapper,ResourceScheduler,Configurable { - private static final String EOL = System.getProperty("line.separator"); - private static final int SAMPLING_SIZE = 60; - private ScheduledExecutorService pool; - // counters for scheduler allocate/handle operations - private Counter schedulerAllocateCounter; - private Counter schedulerHandleCounter; - private Map<SchedulerEventType, Counter> schedulerHandleCounterMap; - // Timers for scheduler allocate/handle operations - private Timer schedulerAllocateTimer; - private Timer schedulerHandleTimer; - private Map<SchedulerEventType, Timer> schedulerHandleTimerMap; - private List<Histogram> schedulerHistogramList; - private Map<Histogram, Timer> histogramTimerMap; - private Lock samplerLock; - private Lock queueLock; - - private Configuration conf; - private ResourceScheduler scheduler; - private Map<ApplicationId, String> appQueueMap = - new ConcurrentHashMap<ApplicationId, String>(); - private BufferedWriter jobRuntimeLogBW; - - // Priority of the ResourceSchedulerWrapper shutdown hook. - public static final int SHUTDOWN_HOOK_PRIORITY = 30; - - // web app - private SLSWebApp web; - - private Map<ContainerId, Resource> preemptionContainerMap = - new ConcurrentHashMap<ContainerId, Resource>(); - - // metrics - private MetricRegistry metrics; - private SchedulerMetrics schedulerMetrics; - private boolean metricsON; - private String metricsOutputDir; - private BufferedWriter metricsLogBW; - private boolean running = false; - private static Map<Class, Class> defaultSchedulerMetricsMap = - new HashMap<Class, Class>(); - static { - defaultSchedulerMetricsMap.put(FairScheduler.class, - FairSchedulerMetrics.class); - defaultSchedulerMetricsMap.put(FifoScheduler.class, - FifoSchedulerMetrics.class); - defaultSchedulerMetricsMap.put(CapacityScheduler.class, - CapacitySchedulerMetrics.class); - } - // must set by outside - private Set<String> queueSet; - private Set<String> trackedAppSet; - - public final Logger LOG = - LoggerFactory.getLogger(ResourceSchedulerWrapper.class); - - public ResourceSchedulerWrapper() { - super(ResourceSchedulerWrapper.class.getName()); - samplerLock = new ReentrantLock(); - queueLock = new ReentrantLock(); - } - - @Override - public void setConf(Configuration conf) { - this.conf = conf; - // set scheduler - Class<? extends ResourceScheduler> klass = conf.getClass( - SLSConfiguration.RM_SCHEDULER, null, ResourceScheduler.class); - - scheduler = ReflectionUtils.newInstance(klass, conf); - // start metrics - metricsON = conf.getBoolean(SLSConfiguration.METRICS_SWITCH, true); - if (metricsON) { - try { - initMetrics(); - } catch (Exception e) { - e.printStackTrace(); - } - } - - ShutdownHookManager.get().addShutdownHook(new Runnable() { - @Override - public void run() { - try { - if (metricsLogBW != null) { - metricsLogBW.write("]"); - metricsLogBW.close(); - } - if (web != null) { - web.stop(); - } - tearDown(); - } catch (Exception e) { - e.printStackTrace(); - } - } - }, SHUTDOWN_HOOK_PRIORITY); - } - - @Override - public Allocation allocate(ApplicationAttemptId attemptId, - List<ResourceRequest> resourceRequests, List<ContainerId> containerIds, - List<String> strings, List<String> strings2, - ContainerUpdates updateRequests) { - if (metricsON) { - final Timer.Context context = schedulerAllocateTimer.time(); - Allocation allocation = null; - try { - allocation = scheduler.allocate(attemptId, resourceRequests, - containerIds, strings, strings2, updateRequests); - return allocation; - } finally { - context.stop(); - schedulerAllocateCounter.inc(); - try { - updateQueueWithAllocateRequest(allocation, attemptId, - resourceRequests, containerIds); - } catch (IOException e) { - e.printStackTrace(); - } - } - } else { - return scheduler.allocate(attemptId, - resourceRequests, containerIds, strings, strings2, updateRequests); - } - } - - @Override - public void handle(SchedulerEvent schedulerEvent) { - // metrics off - if (! metricsON) { - scheduler.handle(schedulerEvent); - return; - } - if(!running) running = true; - - // metrics on - Timer.Context handlerTimer = null; - Timer.Context operationTimer = null; - - NodeUpdateSchedulerEventWrapper eventWrapper; - try { - //if (schedulerEvent instanceof NodeUpdateSchedulerEvent) { - if (schedulerEvent.getType() == SchedulerEventType.NODE_UPDATE - && schedulerEvent instanceof NodeUpdateSchedulerEvent) { - eventWrapper = new NodeUpdateSchedulerEventWrapper( - (NodeUpdateSchedulerEvent)schedulerEvent); - schedulerEvent = eventWrapper; - updateQueueWithNodeUpdate(eventWrapper); - } else if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED - && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) { - // check if having AM Container, update resource usage information - AppAttemptRemovedSchedulerEvent appRemoveEvent = - (AppAttemptRemovedSchedulerEvent) schedulerEvent; - ApplicationAttemptId appAttemptId = - appRemoveEvent.getApplicationAttemptID(); - String queue = appQueueMap.get(appAttemptId.getApplicationId()); - SchedulerAppReport app = scheduler.getSchedulerAppInfo(appAttemptId); - if (! app.getLiveContainers().isEmpty()) { // have 0 or 1 - // should have one container which is AM container - RMContainer rmc = app.getLiveContainers().iterator().next(); - updateQueueMetrics(queue, - rmc.getContainer().getResource().getMemorySize(), - rmc.getContainer().getResource().getVirtualCores()); - } - } - - handlerTimer = schedulerHandleTimer.time(); - operationTimer = schedulerHandleTimerMap - .get(schedulerEvent.getType()).time(); - - scheduler.handle(schedulerEvent); - } finally { - if (handlerTimer != null) handlerTimer.stop(); - if (operationTimer != null) operationTimer.stop(); - schedulerHandleCounter.inc(); - schedulerHandleCounterMap.get(schedulerEvent.getType()).inc(); - - if (schedulerEvent.getType() == SchedulerEventType.APP_REMOVED - && schedulerEvent instanceof AppRemovedSchedulerEvent) { - SLSRunner.decreaseRemainingApps(); - AppRemovedSchedulerEvent appRemoveEvent = - (AppRemovedSchedulerEvent) schedulerEvent; - appQueueMap.remove(appRemoveEvent.getApplicationID()); - } else if (schedulerEvent.getType() == SchedulerEventType.APP_ADDED - && schedulerEvent instanceof AppAddedSchedulerEvent) { - AppAddedSchedulerEvent appAddEvent = - (AppAddedSchedulerEvent) schedulerEvent; - String queueName = appAddEvent.getQueue(); - appQueueMap.put(appAddEvent.getApplicationId(), queueName); - } - } - } - - private void updateQueueWithNodeUpdate( - NodeUpdateSchedulerEventWrapper eventWrapper) { - RMNodeWrapper node = (RMNodeWrapper) eventWrapper.getRMNode(); - List<UpdatedContainerInfo> containerList = node.getContainerUpdates(); - for (UpdatedContainerInfo info : containerList) { - for (ContainerStatus status : info.getCompletedContainers()) { - ContainerId containerId = status.getContainerId(); - SchedulerAppReport app = scheduler.getSchedulerAppInfo( - containerId.getApplicationAttemptId()); - - if (app == null) { - // this happens for the AM container - // The app have already removed when the NM sends the release - // information. - continue; - } - - String queue = - appQueueMap.get(containerId.getApplicationAttemptId() - .getApplicationId()); - int releasedMemory = 0, releasedVCores = 0; - if (status.getExitStatus() == ContainerExitStatus.SUCCESS) { - for (RMContainer rmc : app.getLiveContainers()) { - if (rmc.getContainerId() == containerId) { - releasedMemory += rmc.getContainer().getResource().getMemorySize(); - releasedVCores += rmc.getContainer() - .getResource().getVirtualCores(); - break; - } - } - } else if (status.getExitStatus() == ContainerExitStatus.ABORTED) { - if (preemptionContainerMap.containsKey(containerId)) { - Resource preResource = preemptionContainerMap.get(containerId); - releasedMemory += preResource.getMemorySize(); - releasedVCores += preResource.getVirtualCores(); - preemptionContainerMap.remove(containerId); - } - } - // update queue counters - updateQueueMetrics(queue, releasedMemory, releasedVCores); - } - } - } - - private void updateQueueWithAllocateRequest(Allocation allocation, - ApplicationAttemptId attemptId, - List<ResourceRequest> resourceRequests, - List<ContainerId> containerIds) throws IOException { - // update queue information - Resource pendingResource = Resources.createResource(0, 0); - Resource allocatedResource = Resources.createResource(0, 0); - String queueName = appQueueMap.get(attemptId.getApplicationId()); - // container requested - for (ResourceRequest request : resourceRequests) { - if (request.getResourceName().equals(ResourceRequest.ANY)) { - Resources.addTo(pendingResource, - Resources.multiply(request.getCapability(), - request.getNumContainers())); - } - } - // container allocated - for (Container container : allocation.getContainers()) { - Resources.addTo(allocatedResource, container.getResource()); - Resources.subtractFrom(pendingResource, container.getResource()); - } - // container released from AM - SchedulerAppReport report = scheduler.getSchedulerAppInfo(attemptId); - for (ContainerId containerId : containerIds) { - Container container = null; - for (RMContainer c : report.getLiveContainers()) { - if (c.getContainerId().equals(containerId)) { - container = c.getContainer(); - break; - } - } - if (container != null) { - // released allocated containers - Resources.subtractFrom(allocatedResource, container.getResource()); - } else { - for (RMContainer c : report.getReservedContainers()) { - if (c.getContainerId().equals(containerId)) { - container = c.getContainer(); - break; - } - } - if (container != null) { - // released reserved containers - Resources.subtractFrom(pendingResource, container.getResource()); - } - } - } - // containers released/preemption from scheduler - Set<ContainerId> preemptionContainers = new HashSet<ContainerId>(); - if (allocation.getContainerPreemptions() != null) { - preemptionContainers.addAll(allocation.getContainerPreemptions()); - } - if (allocation.getStrictContainerPreemptions() != null) { - preemptionContainers.addAll(allocation.getStrictContainerPreemptions()); - } - if (! preemptionContainers.isEmpty()) { - for (ContainerId containerId : preemptionContainers) { - if (! preemptionContainerMap.containsKey(containerId)) { - Container container = null; - for (RMContainer c : report.getLiveContainers()) { - if (c.getContainerId().equals(containerId)) { - container = c.getContainer(); - break; - } - } - if (container != null) { - preemptionContainerMap.put(containerId, container.getResource()); - } - } - - } - } - - // update metrics - SortedMap<String, Counter> counterMap = metrics.getCounters(); - String names[] = new String[]{ - "counter.queue." + queueName + ".pending.memory", - "counter.queue." + queueName + ".pending.cores", - "counter.queue." + queueName + ".allocated.memory", - "counter.queue." + queueName + ".allocated.cores"}; - long values[] = new long[]{pendingResource.getMemorySize(), - pendingResource.getVirtualCores(), - allocatedResource.getMemorySize(), allocatedResource.getVirtualCores()}; - for (int i = names.length - 1; i >= 0; i --) { - if (! counterMap.containsKey(names[i])) { - metrics.counter(names[i]); - counterMap = metrics.getCounters(); - } - counterMap.get(names[i]).inc(values[i]); - } - - queueLock.lock(); - try { - if (! schedulerMetrics.isTracked(queueName)) { - schedulerMetrics.trackQueue(queueName); - } - } finally { - queueLock.unlock(); - } - } - - private void tearDown() throws IOException { - // close job runtime writer - if (jobRuntimeLogBW != null) { - jobRuntimeLogBW.close(); - } - // shut pool - if (pool != null) pool.shutdown(); - } - - @SuppressWarnings("unchecked") - private void initMetrics() throws Exception { - metrics = new MetricRegistry(); - // configuration - metricsOutputDir = conf.get(SLSConfiguration.METRICS_OUTPUT_DIR); - int metricsWebAddressPort = conf.getInt( - SLSConfiguration.METRICS_WEB_ADDRESS_PORT, - SLSConfiguration.METRICS_WEB_ADDRESS_PORT_DEFAULT); - // create SchedulerMetrics for current scheduler - String schedulerMetricsType = conf.get(scheduler.getClass().getName()); - Class schedulerMetricsClass = schedulerMetricsType == null? - defaultSchedulerMetricsMap.get(scheduler.getClass()) : - Class.forName(schedulerMetricsType); - schedulerMetrics = (SchedulerMetrics)ReflectionUtils - .newInstance(schedulerMetricsClass, new Configuration()); - schedulerMetrics.init(scheduler, metrics); - - // register various metrics - registerJvmMetrics(); - registerClusterResourceMetrics(); - registerContainerAppNumMetrics(); - registerSchedulerMetrics(); - - // .csv output - initMetricsCSVOutput(); - - // start web app to provide real-time tracking - web = new SLSWebApp(this, metricsWebAddressPort); - web.start(); - - // a thread to update histogram timer - pool = new ScheduledThreadPoolExecutor(2); - pool.scheduleAtFixedRate(new HistogramsRunnable(), 0, 1000, - TimeUnit.MILLISECONDS); - - // a thread to output metrics for real-tiem tracking - pool.scheduleAtFixedRate(new MetricsLogRunnable(), 0, 1000, - TimeUnit.MILLISECONDS); - - // application running information - jobRuntimeLogBW = - new BufferedWriter(new OutputStreamWriter(new FileOutputStream( - metricsOutputDir + "/jobruntime.csv"), "UTF-8")); - jobRuntimeLogBW.write("JobID,real_start_time,real_end_time," + - "simulate_start_time,simulate_end_time" + EOL); - jobRuntimeLogBW.flush(); - } - - private void registerJvmMetrics() { - // add JVM gauges - metrics.register("variable.jvm.free.memory", - new Gauge<Long>() { - @Override - public Long getValue() { - return Runtime.getRuntime().freeMemory(); - } - } - ); - metrics.register("variable.jvm.max.memory", - new Gauge<Long>() { - @Override - public Long getValue() { - return Runtime.getRuntime().maxMemory(); - } - } - ); - metrics.register("variable.jvm.total.memory", - new Gauge<Long>() { - @Override - public Long getValue() { - return Runtime.getRuntime().totalMemory(); - } - } - ); - } - - private void registerClusterResourceMetrics() { - metrics.register("variable.cluster.allocated.memory", - new Gauge<Long>() { - @Override - public Long getValue() { - if(scheduler == null || scheduler.getRootQueueMetrics() == null) { - return 0L; - } else { - return scheduler.getRootQueueMetrics().getAllocatedMB(); - } - } - } - ); - metrics.register("variable.cluster.allocated.vcores", - new Gauge<Integer>() { - @Override - public Integer getValue() { - if(scheduler == null || scheduler.getRootQueueMetrics() == null) { - return 0; - } else { - return scheduler.getRootQueueMetrics().getAllocatedVirtualCores(); - } - } - } - ); - metrics.register("variable.cluster.available.memory", - new Gauge<Long>() { - @Override - public Long getValue() { - if(scheduler == null || scheduler.getRootQueueMetrics() == null) { - return 0L; - } else { - return scheduler.getRootQueueMetrics().getAvailableMB(); - } - } - } - ); - metrics.register("variable.cluster.available.vcores", - new Gauge<Integer>() { - @Override - public Integer getValue() { - if(scheduler == null || scheduler.getRootQueueMetrics() == null) { - return 0; - } else { - return scheduler.getRootQueueMetrics().getAvailableVirtualCores(); - } - } - } - ); - } - - private void registerContainerAppNumMetrics() { - metrics.register("variable.running.application", - new Gauge<Integer>() { - @Override - public Integer getValue() { - if (scheduler == null || scheduler.getRootQueueMetrics() == null) { - return 0; - } else { - return scheduler.getRootQueueMetrics().getAppsRunning(); - } - } - } - ); - metrics.register("variable.running.container", - new Gauge<Integer>() { - @Override - public Integer getValue() { - if(scheduler == null || scheduler.getRootQueueMetrics() == null) { - return 0; - } else { - return scheduler.getRootQueueMetrics().getAllocatedContainers(); - } - } - } - ); - } - - private void registerSchedulerMetrics() { - samplerLock.lock(); - try { - // counters for scheduler operations - schedulerAllocateCounter = metrics.counter( - "counter.scheduler.operation.allocate"); - schedulerHandleCounter = metrics.counter( - "counter.scheduler.operation.handle"); - schedulerHandleCounterMap = new HashMap<SchedulerEventType, Counter>(); - for (SchedulerEventType e : SchedulerEventType.values()) { - Counter counter = metrics.counter( - "counter.scheduler.operation.handle." + e); - schedulerHandleCounterMap.put(e, counter); - } - // timers for scheduler operations - int timeWindowSize = conf.getInt( - SLSConfiguration.METRICS_TIMER_WINDOW_SIZE, - SLSConfiguration.METRICS_TIMER_WINDOW_SIZE_DEFAULT); - schedulerAllocateTimer = new Timer( - new SlidingWindowReservoir(timeWindowSize)); - schedulerHandleTimer = new Timer( - new SlidingWindowReservoir(timeWindowSize)); - schedulerHandleTimerMap = new HashMap<SchedulerEventType, Timer>(); - for (SchedulerEventType e : SchedulerEventType.values()) { - Timer timer = new Timer(new SlidingWindowReservoir(timeWindowSize)); - schedulerHandleTimerMap.put(e, timer); - } - // histogram for scheduler operations (Samplers) - schedulerHistogramList = new ArrayList<Histogram>(); - histogramTimerMap = new HashMap<Histogram, Timer>(); - Histogram schedulerAllocateHistogram = new Histogram( - new SlidingWindowReservoir(SAMPLING_SIZE)); - metrics.register("sampler.scheduler.operation.allocate.timecost", - schedulerAllocateHistogram); - schedulerHistogramList.add(schedulerAllocateHistogram); - histogramTimerMap.put(schedulerAllocateHistogram, schedulerAllocateTimer); - Histogram schedulerHandleHistogram = new Histogram( - new SlidingWindowReservoir(SAMPLING_SIZE)); - metrics.register("sampler.scheduler.operation.handle.timecost", - schedulerHandleHistogram); - schedulerHistogramList.add(schedulerHandleHistogram); - histogramTimerMap.put(schedulerHandleHistogram, schedulerHandleTimer); - for (SchedulerEventType e : SchedulerEventType.values()) { - Histogram histogram = new Histogram( - new SlidingWindowReservoir(SAMPLING_SIZE)); - metrics.register( - "sampler.scheduler.operation.handle." + e + ".timecost", - histogram); - schedulerHistogramList.add(histogram); - histogramTimerMap.put(histogram, schedulerHandleTimerMap.get(e)); - } - } finally { - samplerLock.unlock(); - } - } - - private void initMetricsCSVOutput() { - int timeIntervalMS = conf.getInt( - SLSConfiguration.METRICS_RECORD_INTERVAL_MS, - SLSConfiguration.METRICS_RECORD_INTERVAL_MS_DEFAULT); - File dir = new File(metricsOutputDir + "/metrics"); - if(! dir.exists() - && ! dir.mkdirs()) { - LOG.error("Cannot create directory {}", dir.getAbsoluteFile()); - } - final CsvReporter reporter = CsvReporter.forRegistry(metrics) - .formatFor(Locale.US) - .convertRatesTo(TimeUnit.SECONDS) - .convertDurationsTo(TimeUnit.MILLISECONDS) - .build(new File(metricsOutputDir + "/metrics")); - reporter.start(timeIntervalMS, TimeUnit.MILLISECONDS); - } - - class HistogramsRunnable implements Runnable { - @Override - public void run() { - samplerLock.lock(); - try { - for (Histogram histogram : schedulerHistogramList) { - Timer timer = histogramTimerMap.get(histogram); - histogram.update((int) timer.getSnapshot().getMean()); - } - } finally { - samplerLock.unlock(); - } - } - } - - class MetricsLogRunnable implements Runnable { - private boolean firstLine = true; - public MetricsLogRunnable() { - try { - metricsLogBW = - new BufferedWriter(new OutputStreamWriter(new FileOutputStream( - metricsOutputDir + "/realtimetrack.json"), "UTF-8")); - metricsLogBW.write("["); - } catch (IOException e) { - e.printStackTrace(); - } - } - - @Override - public void run() { - if(running) { - // all WebApp to get real tracking json - String metrics = web.generateRealTimeTrackingMetrics(); - // output - try { - if(firstLine) { - metricsLogBW.write(metrics + EOL); - firstLine = false; - } else { - metricsLogBW.write("," + metrics + EOL); - } - metricsLogBW.flush(); - } catch (IOException e) { - e.printStackTrace(); - } - } - } - } - - // the following functions are used by AMSimulator - public void addAMRuntime(ApplicationId appId, - long traceStartTimeMS, long traceEndTimeMS, - long simulateStartTimeMS, long simulateEndTimeMS) { - if (metricsON) { - try { - // write job runtime information - StringBuilder sb = new StringBuilder(); - sb.append(appId).append(",").append(traceStartTimeMS).append(",") - .append(traceEndTimeMS).append(",").append(simulateStartTimeMS) - .append(",").append(simulateEndTimeMS); - jobRuntimeLogBW.write(sb.toString() + EOL); - jobRuntimeLogBW.flush(); - } catch (IOException e) { - e.printStackTrace(); - } - } - } - - private void updateQueueMetrics(String queue, - long releasedMemory, int releasedVCores) { - // update queue counters - SortedMap<String, Counter> counterMap = metrics.getCounters(); - if (releasedMemory != 0) { - String name = "counter.queue." + queue + ".allocated.memory"; - if (! counterMap.containsKey(name)) { - metrics.counter(name); - counterMap = metrics.getCounters(); - } - counterMap.get(name).inc(-releasedMemory); - } - if (releasedVCores != 0) { - String name = "counter.queue." + queue + ".allocated.cores"; - if (! counterMap.containsKey(name)) { - metrics.counter(name); - counterMap = metrics.getCounters(); - } - counterMap.get(name).inc(-releasedVCores); - } - } - - public void setQueueSet(Set<String> queues) { - this.queueSet = queues; - } - - public Set<String> getQueueSet() { - return this.queueSet; - } - - public void setTrackedAppSet(Set<String> apps) { - this.trackedAppSet = apps; - } - - public Set<String> getTrackedAppSet() { - return this.trackedAppSet; - } - - public MetricRegistry getMetrics() { - return metrics; - } - - public SchedulerMetrics getSchedulerMetrics() { - return schedulerMetrics; - } - - // API open to out classes - public void addTrackedApp(ApplicationAttemptId appAttemptId, - String oldAppId) { - if (metricsON) { - schedulerMetrics.trackApp(appAttemptId, oldAppId); - } - } - - public void removeTrackedApp(ApplicationAttemptId appAttemptId, - String oldAppId) { - if (metricsON) { - schedulerMetrics.untrackApp(appAttemptId, oldAppId); - } - } - - @Override - public Configuration getConf() { - return conf; - } - - @SuppressWarnings("unchecked") - @Override - public void serviceInit(Configuration conf) throws Exception { - ((AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>) - scheduler).init(conf); - super.serviceInit(conf); - initScheduler(conf); - } - - private synchronized void initScheduler(Configuration configuration) throws - IOException { - this.applications = - new ConcurrentHashMap<ApplicationId, - SchedulerApplication<SchedulerApplicationAttempt>>(); - } - - @SuppressWarnings("unchecked") - @Override - public void serviceStart() throws Exception { - ((AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>) - scheduler).start(); - super.serviceStart(); - } - - @SuppressWarnings("unchecked") - @Override - public void serviceStop() throws Exception { - ((AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>) - scheduler).stop(); - super.serviceStop(); - } - - @Override - public void setRMContext(RMContext rmContext) { - scheduler.setRMContext(rmContext); - } - - @Override - public void reinitialize(Configuration conf, RMContext rmContext) - throws IOException { - scheduler.reinitialize(conf, rmContext); - } - - @Override - public void recover(RMStateStore.RMState rmState) throws Exception { - scheduler.recover(rmState); - } - - @Override - public QueueInfo getQueueInfo(String s, boolean b, boolean b2) - throws IOException { - return scheduler.getQueueInfo(s, b, b2); - } - - @Override - public List<QueueUserACLInfo> getQueueUserAclInfo() { - return scheduler.getQueueUserAclInfo(); - } - - @Override - public Resource getMinimumResourceCapability() { - return scheduler.getMinimumResourceCapability(); - } - - @Override - public Resource getMaximumResourceCapability() { - return scheduler.getMaximumResourceCapability(); - } - - @Override - public ResourceCalculator getResourceCalculator() { - return scheduler.getResourceCalculator(); - } - - @Override - public int getNumClusterNodes() { - return scheduler.getNumClusterNodes(); - } - - @Override - public SchedulerNodeReport getNodeReport(NodeId nodeId) { - return scheduler.getNodeReport(nodeId); - } - - @Override - public SchedulerAppReport getSchedulerAppInfo( - ApplicationAttemptId attemptId) { - return scheduler.getSchedulerAppInfo(attemptId); - } - - @Override - public QueueMetrics getRootQueueMetrics() { - return scheduler.getRootQueueMetrics(); - } - - @Override - public synchronized boolean checkAccess(UserGroupInformation callerUGI, - QueueACL acl, String queueName) { - return scheduler.checkAccess(callerUGI, acl, queueName); - } - - @Override - public ApplicationResourceUsageReport getAppResourceUsageReport( - ApplicationAttemptId appAttemptId) { - return scheduler.getAppResourceUsageReport(appAttemptId); - } - - @Override - public List<ApplicationAttemptId> getAppsInQueue(String queue) { - return scheduler.getAppsInQueue(queue); - } - - @Override - public RMContainer getRMContainer(ContainerId containerId) { - return null; - } - - @Override - public String moveApplication(ApplicationId appId, String newQueue) - throws YarnException { - return scheduler.moveApplication(appId, newQueue); - } - - @Override - @LimitedPrivate("yarn") - @Unstable - public Resource getClusterResource() { - return super.getClusterResource(); - } - - @Override - public synchronized List<Container> getTransferredContainers( - ApplicationAttemptId currentAttempt) { - return new ArrayList<Container>(); - } - - @Override - public Map<ApplicationId, SchedulerApplication<SchedulerApplicationAttempt>> - getSchedulerApplications() { - return new HashMap<ApplicationId, - SchedulerApplication<SchedulerApplicationAttempt>>(); - } - - @Override - protected void completedContainerInternal(RMContainer rmContainer, - ContainerStatus containerStatus, RMContainerEventType event) { - // do nothing - } - - @Override - public Priority checkAndGetApplicationPriority(Priority priority, - UserGroupInformation user, String queueName, ApplicationId applicationId) - throws YarnException { - // TODO Dummy implementation. - return Priority.newInstance(0); - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d894f910/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java index 2625cb7..108c2bc 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java @@ -17,176 +17,104 @@ */ package org.apache.hadoop.yarn.sls.scheduler; -import com.codahale.metrics.Counter; -import com.codahale.metrics.CsvReporter; -import com.codahale.metrics.Gauge; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.SlidingWindowReservoir; -import com.codahale.metrics.Timer; -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; -import java.io.OutputStreamWriter; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Set; -import java.util.SortedMap; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.sls.SLSRunner; import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; -import org.apache.hadoop.yarn.sls.web.SLSWebApp; import org.apache.hadoop.yarn.util.resource.Resources; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import com.codahale.metrics.Timer; + +@Private +@Unstable public class SLSCapacityScheduler extends CapacityScheduler implements SchedulerWrapper,Configurable { - private static final String EOL = System.getProperty("line.separator"); - private static final int SAMPLING_SIZE = 60; - private ScheduledExecutorService pool; - // counters for scheduler allocate/handle operations - private Counter schedulerAllocateCounter; - private Counter schedulerHandleCounter; - private Map<SchedulerEventType, Counter> schedulerHandleCounterMap; - // Timers for scheduler allocate/handle operations - private Timer schedulerAllocateTimer; - private Timer schedulerHandleTimer; - private Map<SchedulerEventType, Timer> schedulerHandleTimerMap; - private List<Histogram> schedulerHistogramList; - private Map<Histogram, Timer> histogramTimerMap; - private Lock samplerLock; - private Lock queueLock; private Configuration conf; - + private Map<ApplicationAttemptId, String> appQueueMap = new ConcurrentHashMap<ApplicationAttemptId, String>(); - private BufferedWriter jobRuntimeLogBW; - - // Priority of the ResourceSchedulerWrapper shutdown hook. - public static final int SHUTDOWN_HOOK_PRIORITY = 30; - - // web app - private SLSWebApp web; private Map<ContainerId, Resource> preemptionContainerMap = new ConcurrentHashMap<ContainerId, Resource>(); // metrics - private MetricRegistry metrics; private SchedulerMetrics schedulerMetrics; private boolean metricsON; - private String metricsOutputDir; - private BufferedWriter metricsLogBW; - private boolean running = false; - private static Map<Class, Class> defaultSchedulerMetricsMap = - new HashMap<Class, Class>(); - static { - defaultSchedulerMetricsMap.put(FairScheduler.class, - FairSchedulerMetrics.class); - defaultSchedulerMetricsMap.put(FifoScheduler.class, - FifoSchedulerMetrics.class); - defaultSchedulerMetricsMap.put(CapacityScheduler.class, - CapacitySchedulerMetrics.class); - } - // must set by outside - private Set<String> queueSet; - private Set<String> trackedAppSet; + private Tracker tracker; - public final Logger LOG = LoggerFactory.getLogger(SLSCapacityScheduler.class); + public Tracker getTracker() { + return tracker; + } public SLSCapacityScheduler() { - samplerLock = new ReentrantLock(); - queueLock = new ReentrantLock(); + tracker = new Tracker(); } @Override public void setConf(Configuration conf) { this.conf = conf; super.setConf(conf); - // start metrics metricsON = conf.getBoolean(SLSConfiguration.METRICS_SWITCH, true); if (metricsON) { try { - initMetrics(); + schedulerMetrics = SchedulerMetrics.getInstance(conf, + CapacityScheduler.class); + schedulerMetrics.init(this, conf); } catch (Exception e) { e.printStackTrace(); } } - - ShutdownHookManager.get().addShutdownHook(new Runnable() { - @Override - public void run() { - try { - if (metricsLogBW != null) { - metricsLogBW.write("]"); - metricsLogBW.close(); - } - if (web != null) { - web.stop(); - } - tearDown(); - } catch (Exception e) { - e.printStackTrace(); - } - } - }, SHUTDOWN_HOOK_PRIORITY); } @Override public Allocation allocate(ApplicationAttemptId attemptId, - List<ResourceRequest> resourceRequests, - List<ContainerId> containerIds, - List<String> strings, List<String> strings2, - ContainerUpdates updateRequests) { + List<ResourceRequest> resourceRequests, List<ContainerId> containerIds, + List<String> strings, List<String> strings2, + ContainerUpdates updateRequests) { if (metricsON) { - final Timer.Context context = schedulerAllocateTimer.time(); + final Timer.Context context = schedulerMetrics.getSchedulerAllocateTimer() + .time(); Allocation allocation = null; try { - allocation = super.allocate(attemptId, resourceRequests, - containerIds, strings, strings2, updateRequests); + allocation = super + .allocate(attemptId, resourceRequests, containerIds, strings, + strings2, updateRequests); return allocation; } finally { context.stop(); - schedulerAllocateCounter.inc(); + schedulerMetrics.increaseSchedulerAllocationCounter(); try { updateQueueWithAllocateRequest(allocation, attemptId, resourceRequests, containerIds); @@ -195,81 +123,83 @@ public class SLSCapacityScheduler extends CapacityScheduler implements } } } else { - return super.allocate(attemptId, - resourceRequests, containerIds, strings, strings2, updateRequests); + return super.allocate(attemptId, resourceRequests, containerIds, strings, + strings2, updateRequests); } } @Override public void handle(SchedulerEvent schedulerEvent) { - // metrics off - if (! metricsON) { - super.handle(schedulerEvent); - return; - } - if(!running) running = true; - - // metrics on - Timer.Context handlerTimer = null; - Timer.Context operationTimer = null; - - NodeUpdateSchedulerEventWrapper eventWrapper; - try { - //if (schedulerEvent instanceof NodeUpdateSchedulerEvent) { - if (schedulerEvent.getType() == SchedulerEventType.NODE_UPDATE - && schedulerEvent instanceof NodeUpdateSchedulerEvent) { - eventWrapper = new NodeUpdateSchedulerEventWrapper( - (NodeUpdateSchedulerEvent)schedulerEvent); - schedulerEvent = eventWrapper; - updateQueueWithNodeUpdate(eventWrapper); - } else if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED - && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) { - // check if having AM Container, update resource usage information - AppAttemptRemovedSchedulerEvent appRemoveEvent = - (AppAttemptRemovedSchedulerEvent) schedulerEvent; - ApplicationAttemptId appAttemptId = - appRemoveEvent.getApplicationAttemptID(); - String queue = appQueueMap.get(appAttemptId); - SchedulerAppReport app = super.getSchedulerAppInfo(appAttemptId); - if (! app.getLiveContainers().isEmpty()) { // have 0 or 1 - // should have one container which is AM container - RMContainer rmc = app.getLiveContainers().iterator().next(); - updateQueueMetrics(queue, - rmc.getContainer().getResource().getMemory(), - rmc.getContainer().getResource().getVirtualCores()); - } - } - - handlerTimer = schedulerHandleTimer.time(); - operationTimer = schedulerHandleTimerMap - .get(schedulerEvent.getType()).time(); - - super.handle(schedulerEvent); - } finally { - if (handlerTimer != null) handlerTimer.stop(); - if (operationTimer != null) operationTimer.stop(); - schedulerHandleCounter.inc(); - schedulerHandleCounterMap.get(schedulerEvent.getType()).inc(); - - if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED - && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) { - SLSRunner.decreaseRemainingApps(); - AppAttemptRemovedSchedulerEvent appRemoveEvent = - (AppAttemptRemovedSchedulerEvent) schedulerEvent; - ApplicationAttemptId appAttemptId = - appRemoveEvent.getApplicationAttemptID(); - appQueueMap.remove(appRemoveEvent.getApplicationAttemptID()); - } else if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_ADDED - && schedulerEvent instanceof AppAttemptAddedSchedulerEvent) { - AppAttemptAddedSchedulerEvent appAddEvent = - (AppAttemptAddedSchedulerEvent) schedulerEvent; - SchedulerApplication app = - applications.get(appAddEvent.getApplicationAttemptId() - .getApplicationId()); - appQueueMap.put(appAddEvent.getApplicationAttemptId(), app.getQueue() - .getQueueName()); - } - } + if (!metricsON) { + super.handle(schedulerEvent); + return; + } + + if (!schedulerMetrics.isRunning()) { + schedulerMetrics.setRunning(true); + } + + Timer.Context handlerTimer = null; + Timer.Context operationTimer = null; + + NodeUpdateSchedulerEventWrapper eventWrapper; + try { + if (schedulerEvent.getType() == SchedulerEventType.NODE_UPDATE + && schedulerEvent instanceof NodeUpdateSchedulerEvent) { + eventWrapper = new NodeUpdateSchedulerEventWrapper( + (NodeUpdateSchedulerEvent)schedulerEvent); + schedulerEvent = eventWrapper; + updateQueueWithNodeUpdate(eventWrapper); + } else if (schedulerEvent.getType() == + SchedulerEventType.APP_ATTEMPT_REMOVED + && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) { + // check if having AM Container, update resource usage information + AppAttemptRemovedSchedulerEvent appRemoveEvent = + (AppAttemptRemovedSchedulerEvent) schedulerEvent; + ApplicationAttemptId appAttemptId = + appRemoveEvent.getApplicationAttemptID(); + String queue = appQueueMap.get(appAttemptId); + SchedulerAppReport app = super.getSchedulerAppInfo(appAttemptId); + if (!app.getLiveContainers().isEmpty()) { // have 0 or 1 + // should have one container which is AM container + RMContainer rmc = app.getLiveContainers().iterator().next(); + schedulerMetrics.updateQueueMetricsByRelease( + rmc.getContainer().getResource(), queue); + } + } + + handlerTimer = schedulerMetrics.getSchedulerHandleTimer().time(); + operationTimer = schedulerMetrics.getSchedulerHandleTimer( + schedulerEvent.getType()).time(); + + super.handle(schedulerEvent); + } finally { + if (handlerTimer != null) { + handlerTimer.stop(); + } + if (operationTimer != null) { + operationTimer.stop(); + } + schedulerMetrics.increaseSchedulerHandleCounter(schedulerEvent.getType()); + + if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED + && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) { + SLSRunner.decreaseRemainingApps(); + AppAttemptRemovedSchedulerEvent appRemoveEvent = + (AppAttemptRemovedSchedulerEvent) schedulerEvent; + appQueueMap.remove(appRemoveEvent.getApplicationAttemptID()); + } else if (schedulerEvent.getType() == + SchedulerEventType.APP_ATTEMPT_ADDED + && schedulerEvent instanceof AppAttemptAddedSchedulerEvent) { + AppAttemptAddedSchedulerEvent appAddEvent = + (AppAttemptAddedSchedulerEvent) schedulerEvent; + SchedulerApplication app = + applications.get(appAddEvent.getApplicationAttemptId() + .getApplicationId()); + appQueueMap.put(appAddEvent.getApplicationAttemptId(), app.getQueue() + .getQueueName()); + } + } } private void updateQueueWithNodeUpdate( @@ -294,7 +224,7 @@ public class SLSCapacityScheduler extends CapacityScheduler implements if (status.getExitStatus() == ContainerExitStatus.SUCCESS) { for (RMContainer rmc : app.getLiveContainers()) { if (rmc.getContainerId() == containerId) { - releasedMemory += rmc.getContainer().getResource().getMemory(); + releasedMemory += rmc.getContainer().getResource().getMemorySize(); releasedVCores += rmc.getContainer() .getResource().getVirtualCores(); break; @@ -303,13 +233,14 @@ public class SLSCapacityScheduler extends CapacityScheduler implements } else if (status.getExitStatus() == ContainerExitStatus.ABORTED) { if (preemptionContainerMap.containsKey(containerId)) { Resource preResource = preemptionContainerMap.get(containerId); - releasedMemory += preResource.getMemory(); + releasedMemory += preResource.getMemorySize(); releasedVCores += preResource.getVirtualCores(); preemptionContainerMap.remove(containerId); } } // update queue counters - updateQueueMetrics(queue, releasedMemory, releasedVCores); + schedulerMetrics.updateQueueMetricsByRelease( + Resource.newInstance(releasedMemory, releasedVCores), queue); } } } @@ -388,405 +319,54 @@ public class SLSCapacityScheduler extends CapacityScheduler implements } // update metrics - SortedMap<String, Counter> counterMap = metrics.getCounters(); - String names[] = new String[]{ - "counter.queue." + queueName + ".pending.memory", - "counter.queue." + queueName + ".pending.cores", - "counter.queue." + queueName + ".allocated.memory", - "counter.queue." + queueName + ".allocated.cores"}; - int values[] = new int[]{pendingResource.getMemory(), - pendingResource.getVirtualCores(), - allocatedResource.getMemory(), allocatedResource.getVirtualCores()}; - for (int i = names.length - 1; i >= 0; i --) { - if (! counterMap.containsKey(names[i])) { - metrics.counter(names[i]); - counterMap = metrics.getCounters(); - } - counterMap.get(names[i]).inc(values[i]); - } - - queueLock.lock(); - try { - if (! schedulerMetrics.isTracked(queueName)) { - schedulerMetrics.trackQueue(queueName); - } - } finally { - queueLock.unlock(); - } - } - - private void tearDown() throws IOException { - // close job runtime writer - if (jobRuntimeLogBW != null) { - jobRuntimeLogBW.close(); - } - // shut pool - if (pool != null) pool.shutdown(); - } - - @SuppressWarnings("unchecked") - private void initMetrics() throws Exception { - metrics = new MetricRegistry(); - // configuration - metricsOutputDir = conf.get(SLSConfiguration.METRICS_OUTPUT_DIR); - int metricsWebAddressPort = conf.getInt( - SLSConfiguration.METRICS_WEB_ADDRESS_PORT, - SLSConfiguration.METRICS_WEB_ADDRESS_PORT_DEFAULT); - // create SchedulerMetrics for current scheduler - String schedulerMetricsType = conf.get(CapacityScheduler.class.getName()); - Class schedulerMetricsClass = schedulerMetricsType == null? - defaultSchedulerMetricsMap.get(CapacityScheduler.class) : - Class.forName(schedulerMetricsType); - schedulerMetrics = (SchedulerMetrics)ReflectionUtils - .newInstance(schedulerMetricsClass, new Configuration()); - schedulerMetrics.init(this, metrics); - - // register various metrics - registerJvmMetrics(); - registerClusterResourceMetrics(); - registerContainerAppNumMetrics(); - registerSchedulerMetrics(); - - // .csv output - initMetricsCSVOutput(); - - // start web app to provide real-time tracking - web = new SLSWebApp(this, metricsWebAddressPort); - web.start(); - - // a thread to update histogram timer - pool = new ScheduledThreadPoolExecutor(2); - pool.scheduleAtFixedRate(new HistogramsRunnable(), 0, 1000, - TimeUnit.MILLISECONDS); - - // a thread to output metrics for real-tiem tracking - pool.scheduleAtFixedRate(new MetricsLogRunnable(), 0, 1000, - TimeUnit.MILLISECONDS); - - // application running information - jobRuntimeLogBW = new BufferedWriter( - new OutputStreamWriter(new FileOutputStream( - metricsOutputDir + "/jobruntime.csv"), StandardCharsets.UTF_8)); - jobRuntimeLogBW.write("JobID,real_start_time,real_end_time," + - "simulate_start_time,simulate_end_time" + EOL); - jobRuntimeLogBW.flush(); - } - - private void registerJvmMetrics() { - // add JVM gauges - metrics.register("variable.jvm.free.memory", - new Gauge<Long>() { - @Override - public Long getValue() { - return Runtime.getRuntime().freeMemory(); - } - } - ); - metrics.register("variable.jvm.max.memory", - new Gauge<Long>() { - @Override - public Long getValue() { - return Runtime.getRuntime().maxMemory(); - } - } - ); - metrics.register("variable.jvm.total.memory", - new Gauge<Long>() { - @Override - public Long getValue() { - return Runtime.getRuntime().totalMemory(); - } - } - ); - } - - private void registerClusterResourceMetrics() { - metrics.register("variable.cluster.allocated.memory", - new Gauge<Long>() { - @Override - public Long getValue() { - if( getRootQueueMetrics() == null) { - return 0L; - } else { - return getRootQueueMetrics().getAllocatedMB(); - } - } - } - ); - metrics.register("variable.cluster.allocated.vcores", - new Gauge<Integer>() { - @Override - public Integer getValue() { - if(getRootQueueMetrics() == null) { - return 0; - } else { - return getRootQueueMetrics().getAllocatedVirtualCores(); - } - } - } - ); - metrics.register("variable.cluster.available.memory", - new Gauge<Long>() { - @Override - public Long getValue() { - if(getRootQueueMetrics() == null) { - return 0L; - } else { - return getRootQueueMetrics().getAvailableMB(); - } - } - } - ); - metrics.register("variable.cluster.available.vcores", - new Gauge<Integer>() { - @Override - public Integer getValue() { - if(getRootQueueMetrics() == null) { - return 0; - } else { - return getRootQueueMetrics().getAvailableVirtualCores(); - } - } - } - ); + schedulerMetrics.updateQueueMetrics(pendingResource, allocatedResource, + queueName); } - private void registerContainerAppNumMetrics() { - metrics.register("variable.running.application", - new Gauge<Integer>() { - @Override - public Integer getValue() { - if(getRootQueueMetrics() == null) { - return 0; - } else { - return getRootQueueMetrics().getAppsRunning(); - } - } - } - ); - metrics.register("variable.running.container", - new Gauge<Integer>() { - @Override - public Integer getValue() { - if(getRootQueueMetrics() == null) { - return 0; - } else { - return getRootQueueMetrics().getAllocatedContainers(); - } - } - } - ); - } - - private void registerSchedulerMetrics() { - samplerLock.lock(); - try { - // counters for scheduler operations - schedulerAllocateCounter = metrics.counter( - "counter.scheduler.operation.allocate"); - schedulerHandleCounter = metrics.counter( - "counter.scheduler.operation.handle"); - schedulerHandleCounterMap = new HashMap<SchedulerEventType, Counter>(); - for (SchedulerEventType e : SchedulerEventType.values()) { - Counter counter = metrics.counter( - "counter.scheduler.operation.handle." + e); - schedulerHandleCounterMap.put(e, counter); - } - // timers for scheduler operations - int timeWindowSize = conf.getInt( - SLSConfiguration.METRICS_TIMER_WINDOW_SIZE, - SLSConfiguration.METRICS_TIMER_WINDOW_SIZE_DEFAULT); - schedulerAllocateTimer = new Timer( - new SlidingWindowReservoir(timeWindowSize)); - schedulerHandleTimer = new Timer( - new SlidingWindowReservoir(timeWindowSize)); - schedulerHandleTimerMap = new HashMap<SchedulerEventType, Timer>(); - for (SchedulerEventType e : SchedulerEventType.values()) { - Timer timer = new Timer(new SlidingWindowReservoir(timeWindowSize)); - schedulerHandleTimerMap.put(e, timer); - } - // histogram for scheduler operations (Samplers) - schedulerHistogramList = new ArrayList<Histogram>(); - histogramTimerMap = new HashMap<Histogram, Timer>(); - Histogram schedulerAllocateHistogram = new Histogram( - new SlidingWindowReservoir(SAMPLING_SIZE)); - metrics.register("sampler.scheduler.operation.allocate.timecost", - schedulerAllocateHistogram); - schedulerHistogramList.add(schedulerAllocateHistogram); - histogramTimerMap.put(schedulerAllocateHistogram, schedulerAllocateTimer); - Histogram schedulerHandleHistogram = new Histogram( - new SlidingWindowReservoir(SAMPLING_SIZE)); - metrics.register("sampler.scheduler.operation.handle.timecost", - schedulerHandleHistogram); - schedulerHistogramList.add(schedulerHandleHistogram); - histogramTimerMap.put(schedulerHandleHistogram, schedulerHandleTimer); - for (SchedulerEventType e : SchedulerEventType.values()) { - Histogram histogram = new Histogram( - new SlidingWindowReservoir(SAMPLING_SIZE)); - metrics.register( - "sampler.scheduler.operation.handle." + e + ".timecost", - histogram); - schedulerHistogramList.add(histogram); - histogramTimerMap.put(histogram, schedulerHandleTimerMap.get(e)); - } - } finally { - samplerLock.unlock(); + private void initQueueMetrics(CSQueue queue) { + if (queue instanceof LeafQueue) { + schedulerMetrics.initQueueMetric(queue.getQueueName()); + return; } - } - private void initMetricsCSVOutput() { - int timeIntervalMS = conf.getInt( - SLSConfiguration.METRICS_RECORD_INTERVAL_MS, - SLSConfiguration.METRICS_RECORD_INTERVAL_MS_DEFAULT); - File dir = new File(metricsOutputDir + "/metrics"); - if(! dir.exists() - && ! dir.mkdirs()) { - LOG.error("Cannot create directory {}", dir.getAbsoluteFile()); + for (CSQueue child : queue.getChildQueues()) { + initQueueMetrics(child); } - final CsvReporter reporter = CsvReporter.forRegistry(metrics) - .formatFor(Locale.US) - .convertRatesTo(TimeUnit.SECONDS) - .convertDurationsTo(TimeUnit.MILLISECONDS) - .build(new File(metricsOutputDir + "/metrics")); - reporter.start(timeIntervalMS, TimeUnit.MILLISECONDS); } + @Override + public void serviceInit(Configuration configuration) throws Exception { + super.serviceInit(configuration); - class HistogramsRunnable implements Runnable { - @Override - public void run() { - samplerLock.lock(); - try { - for (Histogram histogram : schedulerHistogramList) { - Timer timer = histogramTimerMap.get(histogram); - histogram.update((int) timer.getSnapshot().getMean()); - } - } finally { - samplerLock.unlock(); - } - } - } - - class MetricsLogRunnable implements Runnable { - private boolean firstLine = true; - public MetricsLogRunnable() { - try { - metricsLogBW = new BufferedWriter( - new OutputStreamWriter(new FileOutputStream( - metricsOutputDir + "/realtimetrack.json"), - StandardCharsets.UTF_8)); - metricsLogBW.write("["); - } catch (IOException e) { - e.printStackTrace(); - } - } - - @Override - public void run() { - if(running) { - // all WebApp to get real tracking json - String metrics = web.generateRealTimeTrackingMetrics(); - // output - try { - if(firstLine) { - metricsLogBW.write(metrics + EOL); - firstLine = false; - } else { - metricsLogBW.write("," + metrics + EOL); - } - metricsLogBW.flush(); - } catch (IOException e) { - e.printStackTrace(); - } - } + if (metricsON) { + initQueueMetrics(getRootQueue()); } } - // the following functions are used by AMSimulator - public void addAMRuntime(ApplicationId appId, - long traceStartTimeMS, long traceEndTimeMS, - long simulateStartTimeMS, long simulateEndTimeMS) { - + @Override + public void serviceStop() throws Exception { try { - // write job runtime information - StringBuilder sb = new StringBuilder(); - sb.append(appId).append(",").append(traceStartTimeMS).append(",") - .append(traceEndTimeMS).append(",").append(simulateStartTimeMS) - .append(",").append(simulateEndTimeMS); - jobRuntimeLogBW.write(sb.toString() + EOL); - jobRuntimeLogBW.flush(); - } catch (IOException e) { + schedulerMetrics.tearDown(); + } catch (Exception e) { e.printStackTrace(); } + super.serviceStop(); } - private void updateQueueMetrics(String queue, - int releasedMemory, int releasedVCores) { - // update queue counters - SortedMap<String, Counter> counterMap = metrics.getCounters(); - if (releasedMemory != 0) { - String name = "counter.queue." + queue + ".allocated.memory"; - if (! counterMap.containsKey(name)) { - metrics.counter(name); - counterMap = metrics.getCounters(); - } - counterMap.get(name).inc(-releasedMemory); - } - if (releasedVCores != 0) { - String name = "counter.queue." + queue + ".allocated.cores"; - if (! counterMap.containsKey(name)) { - metrics.counter(name); - counterMap = metrics.getCounters(); - } - counterMap.get(name).inc(-releasedVCores); - } - } - - public void setQueueSet(Set<String> queues) { - this.queueSet = queues; - } - - public Set<String> getQueueSet() { - return this.queueSet; - } - - public void setTrackedAppSet(Set<String> apps) { - this.trackedAppSet = apps; - } - - public Set<String> getTrackedAppSet() { - return this.trackedAppSet; - } - - public MetricRegistry getMetrics() { - return metrics; - } public SchedulerMetrics getSchedulerMetrics() { return schedulerMetrics; } - // API open to out classes - public void addTrackedApp(ApplicationAttemptId appAttemptId, - String oldAppId) { - if (metricsON) { - schedulerMetrics.trackApp(appAttemptId, oldAppId); - } - } - - public void removeTrackedApp(ApplicationAttemptId appAttemptId, - String oldAppId) { - if (metricsON) { - schedulerMetrics.untrackApp(appAttemptId, oldAppId); - } - } - @Override public Configuration getConf() { return conf; } - - - -} - + public String getRealQueueName(String queue) throws YarnException { + if (getQueue(queue) == null) { + throw new YarnException("Can't find the queue by the given name: " + queue + + "! Please check if queue " + queue + " is in the allocation file."); + } + return getQueue(queue).getQueueName(); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org