http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a3929f2/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java new file mode 100644 index 0000000..81f6648 --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java @@ -0,0 +1,346 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.sls.scheduler; + +import com.codahale.metrics.Timer; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.sls.SLSRunner; +import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +@Private +@Unstable +public class SLSFairScheduler extends FairScheduler + implements SchedulerWrapper, Configurable { + private SchedulerMetrics schedulerMetrics; + private boolean metricsON; + private Tracker tracker; + + private Map<ContainerId, Resource> preemptionContainerMap = + new ConcurrentHashMap<>(); + + public SchedulerMetrics getSchedulerMetrics() { + return schedulerMetrics; + } + + public Tracker getTracker() { + return tracker; + } + + public SLSFairScheduler() { + tracker = new Tracker(); + } + + @Override + public void setConf(Configuration conf) { + super.setConfig(conf); + + metricsON = conf.getBoolean(SLSConfiguration.METRICS_SWITCH, true); + if (metricsON) { + try { + schedulerMetrics = SchedulerMetrics.getInstance(conf, + FairScheduler.class); + schedulerMetrics.init(this, conf); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + @Override + public Allocation allocate(ApplicationAttemptId attemptId, + List<ResourceRequest> resourceRequests, List<ContainerId> containerIds, + List<String> blacklistAdditions, List<String> blacklistRemovals, + ContainerUpdates updateRequests) { + if (metricsON) { + final Timer.Context context = schedulerMetrics.getSchedulerAllocateTimer() + .time(); + Allocation allocation = null; + try { + allocation = super.allocate(attemptId, resourceRequests, containerIds, + blacklistAdditions, blacklistRemovals, updateRequests); + return allocation; + } finally { + context.stop(); + schedulerMetrics.increaseSchedulerAllocationCounter(); + try { + updateQueueWithAllocateRequest(allocation, attemptId, + resourceRequests, containerIds); + } catch (IOException e) { + e.printStackTrace(); + } + } + } else { + return super.allocate(attemptId, resourceRequests, containerIds, + blacklistAdditions, blacklistRemovals, updateRequests); + } + } + + @Override + public void handle(SchedulerEvent schedulerEvent) { + // metrics off + if (!metricsON) { + super.handle(schedulerEvent); + return; + } + + // metrics on + if(!schedulerMetrics.isRunning()) { + schedulerMetrics.setRunning(true); + } + + Timer.Context handlerTimer = null; + Timer.Context operationTimer = null; + + NodeUpdateSchedulerEventWrapper eventWrapper; + try { + if (schedulerEvent.getType() == SchedulerEventType.NODE_UPDATE + && schedulerEvent instanceof NodeUpdateSchedulerEvent) { + eventWrapper = new NodeUpdateSchedulerEventWrapper( + (NodeUpdateSchedulerEvent)schedulerEvent); + schedulerEvent = eventWrapper; + updateQueueWithNodeUpdate(eventWrapper); + } else if ( + schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED + && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) { + // check if having AM Container, update resource usage information + AppAttemptRemovedSchedulerEvent appRemoveEvent = + (AppAttemptRemovedSchedulerEvent) schedulerEvent; + ApplicationAttemptId appAttemptId = + appRemoveEvent.getApplicationAttemptID(); + String queueName = getSchedulerApp(appAttemptId).getQueue().getName(); + SchedulerAppReport app = getSchedulerAppInfo(appAttemptId); + if (!app.getLiveContainers().isEmpty()) { // have 0 or 1 + // should have one container which is AM container + RMContainer rmc = app.getLiveContainers().iterator().next(); + schedulerMetrics.updateQueueMetricsByRelease( + rmc.getContainer().getResource(), queueName); + } + } + + handlerTimer = schedulerMetrics.getSchedulerHandleTimer().time(); + operationTimer = schedulerMetrics.getSchedulerHandleTimer( + schedulerEvent.getType()).time(); + + super.handle(schedulerEvent); + } finally { + if (handlerTimer != null) { + handlerTimer.stop(); + } + if (operationTimer != null) { + operationTimer.stop(); + } + schedulerMetrics.increaseSchedulerHandleCounter(schedulerEvent.getType()); + + if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED + && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) { + SLSRunner.decreaseRemainingApps(); + } + } + } + + private void updateQueueWithNodeUpdate( + NodeUpdateSchedulerEventWrapper eventWrapper) { + RMNodeWrapper node = (RMNodeWrapper) eventWrapper.getRMNode(); + List<UpdatedContainerInfo> containerList = node.getContainerUpdates(); + for (UpdatedContainerInfo info : containerList) { + for (ContainerStatus status : info.getCompletedContainers()) { + ContainerId containerId = status.getContainerId(); + SchedulerAppReport app = super.getSchedulerAppInfo( + containerId.getApplicationAttemptId()); + + if (app == null) { + // this happens for the AM container + // The app have already removed when the NM sends the release + // information. + continue; + } + + int releasedMemory = 0, releasedVCores = 0; + if (status.getExitStatus() == ContainerExitStatus.SUCCESS) { + for (RMContainer rmc : app.getLiveContainers()) { + if (rmc.getContainerId() == containerId) { + Resource resource = rmc.getContainer().getResource(); + releasedMemory += resource.getMemorySize(); + releasedVCores += resource.getVirtualCores(); + break; + } + } + } else if (status.getExitStatus() == ContainerExitStatus.ABORTED) { + if (preemptionContainerMap.containsKey(containerId)) { + Resource preResource = preemptionContainerMap.get(containerId); + releasedMemory += preResource.getMemorySize(); + releasedVCores += preResource.getVirtualCores(); + preemptionContainerMap.remove(containerId); + } + } + // update queue counters + String queue = getSchedulerApp(containerId.getApplicationAttemptId()). + getQueueName(); + schedulerMetrics.updateQueueMetricsByRelease( + Resource.newInstance(releasedMemory, releasedVCores), queue); + } + } + } + + private void updateQueueWithAllocateRequest(Allocation allocation, + ApplicationAttemptId attemptId, + List<ResourceRequest> resourceRequests, + List<ContainerId> containerIds) throws IOException { + // update queue information + Resource pendingResource = Resources.createResource(0, 0); + Resource allocatedResource = Resources.createResource(0, 0); + // container requested + for (ResourceRequest request : resourceRequests) { + if (request.getResourceName().equals(ResourceRequest.ANY)) { + Resources.addTo(pendingResource, + Resources.multiply(request.getCapability(), + request.getNumContainers())); + } + } + // container allocated + for (Container container : allocation.getContainers()) { + Resources.addTo(allocatedResource, container.getResource()); + Resources.subtractFrom(pendingResource, container.getResource()); + } + // container released from AM + SchedulerAppReport report = super.getSchedulerAppInfo(attemptId); + for (ContainerId containerId : containerIds) { + Container container = null; + for (RMContainer c : report.getLiveContainers()) { + if (c.getContainerId().equals(containerId)) { + container = c.getContainer(); + break; + } + } + if (container != null) { + // released allocated containers + Resources.subtractFrom(allocatedResource, container.getResource()); + } else { + for (RMContainer c : report.getReservedContainers()) { + if (c.getContainerId().equals(containerId)) { + container = c.getContainer(); + break; + } + } + if (container != null) { + // released reserved containers + Resources.subtractFrom(pendingResource, container.getResource()); + } + } + } + // containers released/preemption from scheduler + Set<ContainerId> preemptionContainers = new HashSet<ContainerId>(); + if (allocation.getContainerPreemptions() != null) { + preemptionContainers.addAll(allocation.getContainerPreemptions()); + } + if (allocation.getStrictContainerPreemptions() != null) { + preemptionContainers.addAll(allocation.getStrictContainerPreemptions()); + } + if (!preemptionContainers.isEmpty()) { + for (ContainerId containerId : preemptionContainers) { + if (!preemptionContainerMap.containsKey(containerId)) { + Container container = null; + for (RMContainer c : report.getLiveContainers()) { + if (c.getContainerId().equals(containerId)) { + container = c.getContainer(); + break; + } + } + if (container != null) { + preemptionContainerMap.put(containerId, container.getResource()); + } + } + + } + } + + // update metrics + String queueName = getSchedulerApp(attemptId).getQueueName(); + schedulerMetrics.updateQueueMetrics(pendingResource, allocatedResource, + queueName); + } + + private void initQueueMetrics(FSQueue queue) { + if (queue instanceof FSLeafQueue) { + schedulerMetrics.initQueueMetric(queue.getQueueName()); + return; + } + + for (FSQueue child : queue.getChildQueues()) { + initQueueMetrics(child); + } + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + if (metricsON) { + initQueueMetrics(getQueueManager().getRootQueue()); + } + } + + @Override + public void serviceStop() throws Exception { + try { + schedulerMetrics.tearDown(); + } catch (Exception e) { + e.printStackTrace(); + } + super.serviceStop(); + } + + public String getRealQueueName(String queue) throws YarnException { + if (!getQueueManager().exists(queue)) { + throw new YarnException("Can't find the queue by the given name: " + queue + + "! Please check if queue " + queue + " is in the allocation file."); + } + return getQueueManager().getQueue(queue).getQueueName(); + } +} +
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a3929f2/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerMetrics.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerMetrics.java index ecf516d..3c7aa77 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerMetrics.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerMetrics.java @@ -18,66 +18,218 @@ package org.apache.hadoop.yarn.sls.scheduler; +import java.io.BufferedWriter; +import java.io.IOException; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Set; +import java.io.File; +import java.io.FileOutputStream; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.List; +import java.util.SortedMap; +import java.util.Locale; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.Lock; -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler - .ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler - .SchedulerAppReport; - +import com.codahale.metrics.Counter; +import com.codahale.metrics.CsvReporter; import com.codahale.metrics.Gauge; +import com.codahale.metrics.Histogram; import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.SlidingWindowReservoir; +import com.codahale.metrics.Timer; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; +import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; +import org.apache.hadoop.yarn.sls.web.SLSWebApp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @Private @Unstable public abstract class SchedulerMetrics { + private static final String EOL = System.getProperty("line.separator"); + private static final int SAMPLING_SIZE = 60; + private static final Logger LOG = + LoggerFactory.getLogger(SchedulerMetrics.class); + protected ResourceScheduler scheduler; protected Set<String> trackedQueues; protected MetricRegistry metrics; protected Set<String> appTrackedMetrics; protected Set<String> queueTrackedMetrics; - + + private Configuration conf; + private ScheduledExecutorService pool; + private SLSWebApp web; + + // metrics + private String metricsOutputDir; + private BufferedWriter metricsLogBW; + private BufferedWriter jobRuntimeLogBW; + private boolean running = false; + + // counters for scheduler allocate/handle operations + private Counter schedulerAllocateCounter; + private Counter schedulerHandleCounter; + private Map<SchedulerEventType, Counter> schedulerHandleCounterMap; + + // Timers for scheduler allocate/handle operations + private Timer schedulerAllocateTimer; + private Timer schedulerHandleTimer; + private Map<SchedulerEventType, Timer> schedulerHandleTimerMap; + private List<Histogram> schedulerHistogramList; + private Map<Histogram, Timer> histogramTimerMap; + private Lock samplerLock; + private Lock queueLock; + + static Class getSchedulerMetricsClass(Configuration conf, + Class schedulerClass) throws ClassNotFoundException { + Class metricClass = null; + String schedulerMetricsType = conf.get(schedulerClass.getName()); + if (schedulerMetricsType != null) { + metricClass = Class.forName(schedulerMetricsType); + } + + if (schedulerClass.equals(FairScheduler.class)) { + metricClass = FairSchedulerMetrics.class; + } else if (schedulerClass.equals(CapacityScheduler.class)) { + metricClass = CapacitySchedulerMetrics.class; + } else if (schedulerClass.equals(FifoScheduler.class)) { + metricClass = FifoSchedulerMetrics.class; + } + + return metricClass; + } + + @SuppressWarnings("unchecked") + static SchedulerMetrics getInstance(Configuration conf, Class schedulerClass) + throws ClassNotFoundException { + Class schedulerMetricClass = getSchedulerMetricsClass(conf, schedulerClass); + return (SchedulerMetrics) ReflectionUtils + .newInstance(schedulerMetricClass, new Configuration()); + } + public SchedulerMetrics() { - appTrackedMetrics = new HashSet<String>(); + metrics = new MetricRegistry(); + + appTrackedMetrics = new HashSet<>(); appTrackedMetrics.add("live.containers"); appTrackedMetrics.add("reserved.containers"); - queueTrackedMetrics = new HashSet<String>(); + + queueTrackedMetrics = new HashSet<>(); + trackedQueues = new HashSet<>(); + + samplerLock = new ReentrantLock(); + queueLock = new ReentrantLock(); } - - public void init(ResourceScheduler scheduler, MetricRegistry metrics) { - this.scheduler = scheduler; - this.trackedQueues = new HashSet<String>(); - this.metrics = metrics; + + void init(ResourceScheduler resourceScheduler, Configuration config) + throws Exception { + this.scheduler = resourceScheduler; + this.conf = config; + + metricsOutputDir = conf.get(SLSConfiguration.METRICS_OUTPUT_DIR); + + // register various metrics + registerJvmMetrics(); + registerClusterResourceMetrics(); + registerContainerAppNumMetrics(); + registerSchedulerMetrics(); + + // .csv output + initMetricsCSVOutput(); + + // start web app to provide real-time tracking + int metricsWebAddressPort = conf.getInt( + SLSConfiguration.METRICS_WEB_ADDRESS_PORT, + SLSConfiguration.METRICS_WEB_ADDRESS_PORT_DEFAULT); + web = new SLSWebApp((SchedulerWrapper)scheduler, metricsWebAddressPort); + web.start(); + + // a thread to update histogram timer + pool = new ScheduledThreadPoolExecutor(2); + pool.scheduleAtFixedRate(new HistogramsRunnable(), 0, 1000, + TimeUnit.MILLISECONDS); + + // a thread to output metrics for real-tiem tracking + pool.scheduleAtFixedRate(new MetricsLogRunnable(), 0, 1000, + TimeUnit.MILLISECONDS); + + // application running information + jobRuntimeLogBW = + new BufferedWriter(new OutputStreamWriter(new FileOutputStream( + metricsOutputDir + "/jobruntime.csv"), "UTF-8")); + jobRuntimeLogBW.write("JobID,real_start_time,real_end_time," + + "simulate_start_time,simulate_end_time" + EOL); + jobRuntimeLogBW.flush(); } - - public void trackApp(final ApplicationAttemptId appAttemptId, - String oldAppId) { + + public MetricRegistry getMetrics() { + return metrics; + } + + protected SchedulerApplicationAttempt getSchedulerAppAttempt( + ApplicationId appId) { + AbstractYarnScheduler yarnScheduler = (AbstractYarnScheduler)scheduler; + SchedulerApplication app = (SchedulerApplication)yarnScheduler + .getSchedulerApplications().get(appId); + if (app == null) { + return null; + } + return app.getCurrentAppAttempt(); + } + + public void trackApp(final ApplicationId appId, String oldAppId) { metrics.register("variable.app." + oldAppId + ".live.containers", - new Gauge<Integer>() { - @Override - public Integer getValue() { - SchedulerAppReport app = scheduler.getSchedulerAppInfo(appAttemptId); - return app.getLiveContainers().size(); + new Gauge<Integer>() { + @Override + public Integer getValue() { + SchedulerApplicationAttempt appAttempt = + getSchedulerAppAttempt(appId); + if (appAttempt != null) { + return appAttempt.getLiveContainers().size(); + } else { + return 0; + } + } } - } ); metrics.register("variable.app." + oldAppId + ".reserved.containers", - new Gauge<Integer>() { - @Override - public Integer getValue() { - SchedulerAppReport app = scheduler.getSchedulerAppInfo(appAttemptId); - return app.getReservedContainers().size(); + new Gauge<Integer>() { + @Override + public Integer getValue() { + SchedulerApplicationAttempt appAttempt = + getSchedulerAppAttempt(appId); + if (appAttempt != null) { + return appAttempt.getReservedContainers().size(); + } else { + return 0; + } + } } - } ); } - - public void untrackApp(ApplicationAttemptId appAttemptId, - String oldAppId) { + + public void untrackApp(String oldAppId) { for (String m : appTrackedMetrics) { metrics.remove("variable.app." + oldAppId + "." + m); } @@ -98,7 +250,392 @@ public abstract class SchedulerMetrics { public Set<String> getAppTrackedMetrics() { return appTrackedMetrics; } + public Set<String> getQueueTrackedMetrics() { return queueTrackedMetrics; } + + private void registerJvmMetrics() { + // add JVM gauges + metrics.register("variable.jvm.free.memory", + new Gauge<Long>() { + @Override + public Long getValue() { + return Runtime.getRuntime().freeMemory(); + } + } + ); + metrics.register("variable.jvm.max.memory", + new Gauge<Long>() { + @Override + public Long getValue() { + return Runtime.getRuntime().maxMemory(); + } + } + ); + metrics.register("variable.jvm.total.memory", + new Gauge<Long>() { + @Override + public Long getValue() { + return Runtime.getRuntime().totalMemory(); + } + } + ); + } + + private void registerClusterResourceMetrics() { + metrics.register("variable.cluster.allocated.memory", + new Gauge<Long>() { + @Override + public Long getValue() { + if (scheduler.getRootQueueMetrics() == null) { + return 0L; + } else { + return scheduler.getRootQueueMetrics().getAllocatedMB(); + } + } + } + ); + metrics.register("variable.cluster.allocated.vcores", + new Gauge<Integer>() { + @Override + public Integer getValue() { + if (scheduler.getRootQueueMetrics() == null) { + return 0; + } else { + return scheduler.getRootQueueMetrics().getAllocatedVirtualCores(); + } + } + } + ); + metrics.register("variable.cluster.available.memory", + new Gauge<Long>() { + @Override + public Long getValue() { + if (scheduler.getRootQueueMetrics() == null) { + return 0L; + } else { + return scheduler.getRootQueueMetrics().getAvailableMB(); + } + } + } + ); + metrics.register("variable.cluster.available.vcores", + new Gauge<Integer>() { + @Override + public Integer getValue() { + if (scheduler.getRootQueueMetrics() == null) { + return 0; + } else { + return scheduler.getRootQueueMetrics().getAvailableVirtualCores(); + } + } + } + ); + } + + private void registerContainerAppNumMetrics() { + metrics.register("variable.running.application", + new Gauge<Integer>() { + @Override + public Integer getValue() { + if (scheduler.getRootQueueMetrics() == null) { + return 0; + } else { + return scheduler.getRootQueueMetrics().getAppsRunning(); + } + } + } + ); + metrics.register("variable.running.container", + new Gauge<Integer>() { + @Override + public Integer getValue() { + if (scheduler.getRootQueueMetrics() == null) { + return 0; + } else { + return scheduler.getRootQueueMetrics().getAllocatedContainers(); + } + } + } + ); + } + + private void registerSchedulerMetrics() { + samplerLock.lock(); + try { + // counters for scheduler operations + schedulerAllocateCounter = metrics.counter( + "counter.scheduler.operation.allocate"); + schedulerHandleCounter = metrics.counter( + "counter.scheduler.operation.handle"); + schedulerHandleCounterMap = new HashMap<>(); + for (SchedulerEventType e : SchedulerEventType.values()) { + Counter counter = metrics.counter( + "counter.scheduler.operation.handle." + e); + schedulerHandleCounterMap.put(e, counter); + } + // timers for scheduler operations + int timeWindowSize = conf.getInt( + SLSConfiguration.METRICS_TIMER_WINDOW_SIZE, + SLSConfiguration.METRICS_TIMER_WINDOW_SIZE_DEFAULT); + schedulerAllocateTimer = new Timer( + new SlidingWindowReservoir(timeWindowSize)); + schedulerHandleTimer = new Timer( + new SlidingWindowReservoir(timeWindowSize)); + schedulerHandleTimerMap = new HashMap<>(); + for (SchedulerEventType e : SchedulerEventType.values()) { + Timer timer = new Timer(new SlidingWindowReservoir(timeWindowSize)); + schedulerHandleTimerMap.put(e, timer); + } + // histogram for scheduler operations (Samplers) + schedulerHistogramList = new ArrayList<>(); + histogramTimerMap = new HashMap<>(); + Histogram schedulerAllocateHistogram = new Histogram( + new SlidingWindowReservoir(SAMPLING_SIZE)); + metrics.register("sampler.scheduler.operation.allocate.timecost", + schedulerAllocateHistogram); + schedulerHistogramList.add(schedulerAllocateHistogram); + histogramTimerMap.put(schedulerAllocateHistogram, schedulerAllocateTimer); + Histogram schedulerHandleHistogram = new Histogram( + new SlidingWindowReservoir(SAMPLING_SIZE)); + metrics.register("sampler.scheduler.operation.handle.timecost", + schedulerHandleHistogram); + schedulerHistogramList.add(schedulerHandleHistogram); + histogramTimerMap.put(schedulerHandleHistogram, schedulerHandleTimer); + for (SchedulerEventType e : SchedulerEventType.values()) { + Histogram histogram = new Histogram( + new SlidingWindowReservoir(SAMPLING_SIZE)); + metrics.register( + "sampler.scheduler.operation.handle." + e + ".timecost", + histogram); + schedulerHistogramList.add(histogram); + histogramTimerMap.put(histogram, schedulerHandleTimerMap.get(e)); + } + } finally { + samplerLock.unlock(); + } + } + + private void initMetricsCSVOutput() { + int timeIntervalMS = conf.getInt( + SLSConfiguration.METRICS_RECORD_INTERVAL_MS, + SLSConfiguration.METRICS_RECORD_INTERVAL_MS_DEFAULT); + File dir = new File(metricsOutputDir + "/metrics"); + if(!dir.exists() && !dir.mkdirs()) { + LOG.error("Cannot create directory {}", dir.getAbsoluteFile()); + } + final CsvReporter reporter = CsvReporter.forRegistry(metrics) + .formatFor(Locale.US) + .convertRatesTo(TimeUnit.SECONDS) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .build(new File(metricsOutputDir + "/metrics")); + reporter.start(timeIntervalMS, TimeUnit.MILLISECONDS); + } + + boolean isRunning() { + return running; + } + + void setRunning(boolean running) { + this.running = running; + } + + class HistogramsRunnable implements Runnable { + @Override + public void run() { + samplerLock.lock(); + try { + for (Histogram histogram : schedulerHistogramList) { + Timer timer = histogramTimerMap.get(histogram); + histogram.update((int) timer.getSnapshot().getMean()); + } + } finally { + samplerLock.unlock(); + } + } + } + + class MetricsLogRunnable implements Runnable { + private boolean firstLine = true; + + MetricsLogRunnable() { + try { + metricsLogBW = + new BufferedWriter(new OutputStreamWriter(new FileOutputStream( + metricsOutputDir + "/realtimetrack.json"), "UTF-8")); + metricsLogBW.write("["); + } catch (IOException e) { + LOG.info(e.getMessage()); + } + } + + @Override + public void run() { + if(running) { + // all WebApp to get real tracking json + String trackingMetrics = web.generateRealTimeTrackingMetrics(); + // output + try { + if(firstLine) { + metricsLogBW.write(trackingMetrics + EOL); + firstLine = false; + } else { + metricsLogBW.write("," + trackingMetrics + EOL); + } + metricsLogBW.flush(); + } catch (IOException e) { + LOG.info(e.getMessage()); + } + } + } + } + + void tearDown() throws Exception { + if (metricsLogBW != null) { + metricsLogBW.write("]"); + metricsLogBW.close(); + } + + if (web != null) { + web.stop(); + } + + if (jobRuntimeLogBW != null) { + jobRuntimeLogBW.close(); + } + + if (pool != null) { + pool.shutdown(); + } + } + + void increaseSchedulerAllocationCounter() { + schedulerAllocateCounter.inc(); + } + + void increaseSchedulerHandleCounter(SchedulerEventType schedulerEventType) { + schedulerHandleCounter.inc(); + schedulerHandleCounterMap.get(schedulerEventType).inc(); + } + + Timer getSchedulerAllocateTimer() { + return schedulerAllocateTimer; + } + + Timer getSchedulerHandleTimer() { + return schedulerHandleTimer; + } + + Timer getSchedulerHandleTimer(SchedulerEventType schedulerEventType) { + return schedulerHandleTimerMap.get(schedulerEventType); + } + + private enum QueueMetric { + PENDING_MEMORY("pending.memory"), + PENDING_VCORES("pending.cores"), + ALLOCATED_MEMORY("allocated.memory"), + ALLOCATED_VCORES("allocated.cores"); + + private String value; + + QueueMetric(String value) { + this.value = value; + } + } + + private String getQueueMetricName(String queue, QueueMetric metric) { + return "counter.queue." + queue + "." + metric.value; + } + + private void traceQueueIfNotTraced(String queue) { + queueLock.lock(); + try { + if (!isTracked(queue)) { + trackQueue(queue); + } + } finally { + queueLock.unlock(); + } + } + + void initQueueMetric(String queueName){ + SortedMap<String, Counter> counterMap = metrics.getCounters(); + + for (QueueMetric queueMetric : QueueMetric.values()) { + String metricName = getQueueMetricName(queueName, queueMetric); + if (!counterMap.containsKey(metricName)) { + metrics.counter(metricName); + counterMap = metrics.getCounters(); + } + } + + traceQueueIfNotTraced(queueName); + } + + void updateQueueMetrics(Resource pendingResource, Resource allocatedResource, + String queueName) { + SortedMap<String, Counter> counterMap = metrics.getCounters(); + for(QueueMetric metric : QueueMetric.values()) { + String metricName = getQueueMetricName(queueName, metric); + if (!counterMap.containsKey(metricName)) { + metrics.counter(metricName); + counterMap = metrics.getCounters(); + } + + if (metric == QueueMetric.PENDING_MEMORY) { + counterMap.get(metricName).inc(pendingResource.getMemorySize()); + } else if (metric == QueueMetric.PENDING_VCORES) { + counterMap.get(metricName).inc(pendingResource.getVirtualCores()); + } else if (metric == QueueMetric.ALLOCATED_MEMORY) { + counterMap.get(metricName).inc(allocatedResource.getMemorySize()); + } else if (metric == QueueMetric.ALLOCATED_VCORES){ + counterMap.get(metricName).inc(allocatedResource.getVirtualCores()); + } + } + + traceQueueIfNotTraced(queueName); + } + + void updateQueueMetricsByRelease(Resource releaseResource, String queue) { + SortedMap<String, Counter> counterMap = metrics.getCounters(); + String name = getQueueMetricName(queue, QueueMetric.ALLOCATED_MEMORY); + if (!counterMap.containsKey(name)) { + metrics.counter(name); + counterMap = metrics.getCounters(); + } + counterMap.get(name).inc(-releaseResource.getMemorySize()); + + String vcoreMetric = + getQueueMetricName(queue, QueueMetric.ALLOCATED_VCORES); + if (!counterMap.containsKey(vcoreMetric)) { + metrics.counter(vcoreMetric); + counterMap = metrics.getCounters(); + } + counterMap.get(vcoreMetric).inc(-releaseResource.getVirtualCores()); + } + + public void addTrackedApp(ApplicationId appId, + String oldAppId) { + trackApp(appId, oldAppId); + } + + public void removeTrackedApp(String oldAppId) { + untrackApp(oldAppId); + } + + public void addAMRuntime(ApplicationId appId, long traceStartTimeMS, + long traceEndTimeMS, long simulateStartTimeMS, long simulateEndTimeMS) { + try { + // write job runtime information + StringBuilder sb = new StringBuilder(); + sb.append(appId).append(",").append(traceStartTimeMS).append(",") + .append(traceEndTimeMS).append(",").append(simulateStartTimeMS) + .append(",").append(simulateEndTimeMS); + jobRuntimeLogBW.write(sb.toString() + EOL); + jobRuntimeLogBW.flush(); + } catch (IOException e) { + LOG.info(e.getMessage()); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a3929f2/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java index 44629f5..7112b1a 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java @@ -17,27 +17,16 @@ */ package org.apache.hadoop.yarn.sls.scheduler; -import java.util.Set; - -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; - -import com.codahale.metrics.MetricRegistry; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.exceptions.YarnException; +@Private +@Unstable public interface SchedulerWrapper { + SchedulerMetrics getSchedulerMetrics(); - public MetricRegistry getMetrics(); - public SchedulerMetrics getSchedulerMetrics(); - public Set<String> getQueueSet(); - public void setQueueSet(Set<String> queues); - public Set<String> getTrackedAppSet(); - public void setTrackedAppSet(Set<String> apps); - public void addTrackedApp(ApplicationAttemptId appAttemptId, - String oldAppId); - public void removeTrackedApp(ApplicationAttemptId appAttemptId, - String oldAppId); - public void addAMRuntime(ApplicationId appId, - long traceStartTimeMS, long traceEndTimeMS, - long simulateStartTimeMS, long simulateEndTimeMS); + Tracker getTracker(); + String getRealQueueName(String queue) throws YarnException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a3929f2/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/TaskRunner.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/TaskRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/TaskRunner.java index d352904..19cfe88 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/TaskRunner.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/TaskRunner.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.yarn.sls.scheduler; -import java.io.IOException; import java.text.MessageFormat; import java.util.Queue; import java.util.concurrent.DelayQueue; @@ -27,7 +26,6 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.yarn.exceptions.YarnException; @Private @Unstable @@ -148,8 +146,8 @@ public class TaskRunner { @SuppressWarnings("unchecked") public void start() { - if (executor != null) { - throw new IllegalStateException("Already started"); + if (executor != null && !executor.isTerminated()) { + throw new IllegalStateException("Executor already running"); } DelayQueue preStartQueue = queue; @@ -164,8 +162,9 @@ public class TaskRunner { } } - public void stop() { + public void stop() throws InterruptedException { executor.shutdownNow(); + executor.awaitTermination(20, TimeUnit.SECONDS); } @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a3929f2/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/Tracker.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/Tracker.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/Tracker.java new file mode 100644 index 0000000..42a5c3c --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/Tracker.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.sls.scheduler; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +import java.util.Set; + +@Private +@Unstable +public class Tracker { + private Set<String> queueSet; + private Set<String> trackedAppSet; + + public void setQueueSet(Set<String> queues) { + queueSet = queues; + } + + public Set<String> getQueueSet() { + return queueSet; + } + + public void setTrackedAppSet(Set<String> apps) { + trackedAppSet = apps; + } + + public Set<String> getTrackedAppSet() { + return trackedAppSet; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a3929f2/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java new file mode 100644 index 0000000..3ed81e1 --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java @@ -0,0 +1,306 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.sls.synthetic; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.math3.distribution.LogNormalDistribution; +import org.apache.commons.math3.random.JDKRandomGenerator; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.TaskStatus.State; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.tools.rumen.*; +import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values; + +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.hadoop.mapreduce.MRJobConfig.QUEUE_NAME; + +/** + * Generates random task data for a synthetic job. + */ +public class SynthJob implements JobStory { + + @SuppressWarnings("StaticVariableName") + private static Log LOG = LogFactory.getLog(SynthJob.class); + + private final Configuration conf; + private final int id; + + @SuppressWarnings("ConstantName") + private static final AtomicInteger sequence = new AtomicInteger(0); + private final String name; + private final String queueName; + private final SynthJobClass jobClass; + + // job timing + private final long submitTime; + private final long duration; + private final long deadline; + + private final int numMapTasks; + private final int numRedTasks; + private final long mapMaxMemory; + private final long reduceMaxMemory; + private final long mapMaxVcores; + private final long reduceMaxVcores; + private final long[] mapRuntime; + private final float[] reduceRuntime; + private long totMapRuntime; + private long totRedRuntime; + + public SynthJob(JDKRandomGenerator rand, Configuration conf, + SynthJobClass jobClass, long actualSubmissionTime) { + + this.conf = conf; + this.jobClass = jobClass; + + this.duration = MILLISECONDS.convert(jobClass.getDur(), SECONDS); + this.numMapTasks = jobClass.getMtasks(); + this.numRedTasks = jobClass.getRtasks(); + + // sample memory distributions, correct for sub-minAlloc sizes + long tempMapMaxMemory = jobClass.getMapMaxMemory(); + this.mapMaxMemory = tempMapMaxMemory < MRJobConfig.DEFAULT_MAP_MEMORY_MB + ? MRJobConfig.DEFAULT_MAP_MEMORY_MB : tempMapMaxMemory; + long tempReduceMaxMemory = jobClass.getReduceMaxMemory(); + this.reduceMaxMemory = + tempReduceMaxMemory < MRJobConfig.DEFAULT_REDUCE_MEMORY_MB + ? MRJobConfig.DEFAULT_REDUCE_MEMORY_MB : tempReduceMaxMemory; + + // sample vcores distributions, correct for sub-minAlloc sizes + long tempMapMaxVCores = jobClass.getMapMaxVcores(); + this.mapMaxVcores = tempMapMaxVCores < MRJobConfig.DEFAULT_MAP_CPU_VCORES + ? MRJobConfig.DEFAULT_MAP_CPU_VCORES : tempMapMaxVCores; + long tempReduceMaxVcores = jobClass.getReduceMaxVcores(); + this.reduceMaxVcores = + tempReduceMaxVcores < MRJobConfig.DEFAULT_REDUCE_CPU_VCORES + ? MRJobConfig.DEFAULT_REDUCE_CPU_VCORES : tempReduceMaxVcores; + + if (numMapTasks > 0) { + conf.setLong(MRJobConfig.MAP_MEMORY_MB, this.mapMaxMemory); + conf.set(MRJobConfig.MAP_JAVA_OPTS, + "-Xmx" + (this.mapMaxMemory - 100) + "m"); + } + + if (numRedTasks > 0) { + conf.setLong(MRJobConfig.REDUCE_MEMORY_MB, this.reduceMaxMemory); + conf.set(MRJobConfig.REDUCE_JAVA_OPTS, + "-Xmx" + (this.reduceMaxMemory - 100) + "m"); + } + + boolean hasDeadline = + (rand.nextDouble() <= jobClass.jobClass.chance_of_reservation); + + LogNormalDistribution deadlineFactor = + SynthUtils.getLogNormalDist(rand, jobClass.jobClass.deadline_factor_avg, + jobClass.jobClass.deadline_factor_stddev); + + double deadlineFactorSample = + (deadlineFactor != null) ? deadlineFactor.sample() : -1; + + this.queueName = jobClass.workload.getQueueName(); + + this.submitTime = MILLISECONDS.convert(actualSubmissionTime, SECONDS); + + this.deadline = + hasDeadline ? MILLISECONDS.convert(actualSubmissionTime, SECONDS) + + (long) Math.ceil(deadlineFactorSample * duration) : -1; + + conf.set(QUEUE_NAME, queueName); + + // name and initialize job randomness + final long seed = rand.nextLong(); + rand.setSeed(seed); + id = sequence.getAndIncrement(); + + name = String.format(jobClass.getClassName() + "_%06d", id); + LOG.debug(name + " (" + seed + ")"); + + LOG.info("JOB TIMING`: job: " + name + " submission:" + submitTime + + " deadline:" + deadline + " duration:" + duration + + " deadline-submission: " + (deadline - submitTime)); + + // generate map and reduce runtimes + mapRuntime = new long[numMapTasks]; + for (int i = 0; i < numMapTasks; i++) { + mapRuntime[i] = jobClass.getMapTimeSample(); + totMapRuntime += mapRuntime[i]; + } + reduceRuntime = new float[numRedTasks]; + for (int i = 0; i < numRedTasks; i++) { + reduceRuntime[i] = jobClass.getReduceTimeSample(); + totRedRuntime += (long) Math.ceil(reduceRuntime[i]); + } + } + + public boolean hasDeadline() { + return deadline > 0; + } + + @Override + public String getName() { + return name; + } + + @Override + public String getUser() { + return jobClass.getUserName(); + } + + @Override + public JobID getJobID() { + return new JobID("job_mock_" + name, id); + } + + @Override + public Values getOutcome() { + return Values.SUCCESS; + } + + @Override + public long getSubmissionTime() { + return submitTime; + } + + @Override + public int getNumberMaps() { + return numMapTasks; + } + + @Override + public int getNumberReduces() { + return numRedTasks; + } + + @Override + public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) { + switch (taskType) { + case MAP: + return new TaskInfo(-1, -1, -1, -1, mapMaxMemory, mapMaxVcores); + case REDUCE: + return new TaskInfo(-1, -1, -1, -1, reduceMaxMemory, reduceMaxVcores); + default: + throw new IllegalArgumentException("Not interested"); + } + } + + @Override + public InputSplit[] getInputSplits() { + throw new UnsupportedOperationException(); + } + + @Override + public TaskAttemptInfo getTaskAttemptInfo(TaskType taskType, int taskNumber, + int taskAttemptNumber) { + switch (taskType) { + case MAP: + return new MapTaskAttemptInfo(State.SUCCEEDED, + getTaskInfo(taskType, taskNumber), mapRuntime[taskNumber], null); + + case REDUCE: + // We assume uniform split between pull/sort/reduce + // aligned with naive progress reporting assumptions + return new ReduceTaskAttemptInfo(State.SUCCEEDED, + getTaskInfo(taskType, taskNumber), + (long) Math.round((reduceRuntime[taskNumber] / 3)), + (long) Math.round((reduceRuntime[taskNumber] / 3)), + (long) Math.round((reduceRuntime[taskNumber] / 3)), null); + + default: + break; + } + throw new UnsupportedOperationException(); + } + + @Override + public TaskAttemptInfo getMapTaskAttemptInfoAdjusted(int taskNumber, + int taskAttemptNumber, int locality) { + throw new UnsupportedOperationException(); + } + + @Override + public org.apache.hadoop.mapred.JobConf getJobConf() { + return new JobConf(conf); + } + + @Override + public String getQueueName() { + return queueName; + } + + @Override + public String toString() { + return "SynthJob [\n" + " workload=" + jobClass.getWorkload().getId() + + "\n" + " jobClass=" + + jobClass.getWorkload().getClassList().indexOf(jobClass) + "\n" + + " conf=" + conf + ",\n" + " id=" + id + ",\n" + " name=" + name + + ",\n" + " mapRuntime=" + Arrays.toString(mapRuntime) + ",\n" + + " reduceRuntime=" + Arrays.toString(reduceRuntime) + ",\n" + + " submitTime=" + submitTime + ",\n" + " numMapTasks=" + numMapTasks + + ",\n" + " numRedTasks=" + numRedTasks + ",\n" + " mapMaxMemory=" + + mapMaxMemory + ",\n" + " reduceMaxMemory=" + reduceMaxMemory + ",\n" + + " queueName=" + queueName + "\n" + "]"; + } + + public SynthJobClass getJobClass() { + return jobClass; + } + + public long getTotalSlotTime() { + return totMapRuntime + totRedRuntime; + } + + public long getDuration() { + return duration; + } + + public long getDeadline() { + return deadline; + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof SynthJob)) { + return false; + } + SynthJob o = (SynthJob) other; + return Arrays.equals(mapRuntime, o.mapRuntime) + && Arrays.equals(reduceRuntime, o.reduceRuntime) + && submitTime == o.submitTime && numMapTasks == o.numMapTasks + && numRedTasks == o.numRedTasks && mapMaxMemory == o.mapMaxMemory + && reduceMaxMemory == o.reduceMaxMemory + && mapMaxVcores == o.mapMaxVcores + && reduceMaxVcores == o.reduceMaxVcores && queueName.equals(o.queueName) + && jobClass.equals(o.jobClass) && totMapRuntime == o.totMapRuntime + && totRedRuntime == o.totRedRuntime; + } + + @Override + public int hashCode() { + // could have a bad distr; investigate if a relevant use case exists + return jobClass.hashCode() * (int) submitTime; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a3929f2/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJobClass.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJobClass.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJobClass.java new file mode 100644 index 0000000..439698f --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJobClass.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.sls.synthetic; + +import org.apache.commons.math3.distribution.AbstractRealDistribution; +import org.apache.commons.math3.distribution.LogNormalDistribution; +import org.apache.commons.math3.random.JDKRandomGenerator; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.tools.rumen.JobStory; +import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer.JobClass; +import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer.Trace; + +/** + * This is a class that represent a class of Jobs. It is used to generate an + * individual job, by picking random durations, task counts, container size, + * etc. + */ +public class SynthJobClass { + + private final JDKRandomGenerator rand; + private final LogNormalDistribution dur; + private final LogNormalDistribution mapRuntime; + private final LogNormalDistribution redRuntime; + private final LogNormalDistribution mtasks; + private final LogNormalDistribution rtasks; + private final LogNormalDistribution mapMem; + private final LogNormalDistribution redMem; + private final LogNormalDistribution mapVcores; + private final LogNormalDistribution redVcores; + + private final Trace trace; + @SuppressWarnings("VisibilityModifier") + protected final SynthWorkload workload; + @SuppressWarnings("VisibilityModifier") + protected final JobClass jobClass; + + public SynthJobClass(JDKRandomGenerator rand, Trace trace, + SynthWorkload workload, int classId) { + + this.trace = trace; + this.workload = workload; + this.rand = new JDKRandomGenerator(); + this.rand.setSeed(rand.nextLong()); + jobClass = trace.workloads.get(workload.getId()).job_classes.get(classId); + + this.dur = SynthUtils.getLogNormalDist(rand, jobClass.dur_avg, + jobClass.dur_stddev); + this.mapRuntime = SynthUtils.getLogNormalDist(rand, jobClass.mtime_avg, + jobClass.mtime_stddev); + this.redRuntime = SynthUtils.getLogNormalDist(rand, jobClass.rtime_avg, + jobClass.rtime_stddev); + this.mtasks = SynthUtils.getLogNormalDist(rand, jobClass.mtasks_avg, + jobClass.mtasks_stddev); + this.rtasks = SynthUtils.getLogNormalDist(rand, jobClass.rtasks_avg, + jobClass.rtasks_stddev); + + this.mapMem = SynthUtils.getLogNormalDist(rand, jobClass.map_max_memory_avg, + jobClass.map_max_memory_stddev); + this.redMem = SynthUtils.getLogNormalDist(rand, + jobClass.reduce_max_memory_avg, jobClass.reduce_max_memory_stddev); + this.mapVcores = SynthUtils.getLogNormalDist(rand, + jobClass.map_max_vcores_avg, jobClass.map_max_vcores_stddev); + this.redVcores = SynthUtils.getLogNormalDist(rand, + jobClass.reduce_max_vcores_avg, jobClass.reduce_max_vcores_stddev); + } + + public JobStory getJobStory(Configuration conf, long actualSubmissionTime) { + return new SynthJob(rand, conf, this, actualSubmissionTime); + } + + @Override + public String toString() { + return "SynthJobClass [workload=" + workload.getName() + ", class=" + + jobClass.class_name + " job_count=" + jobClass.class_weight + ", dur=" + + ((dur != null) ? dur.getNumericalMean() : 0) + ", mapRuntime=" + + ((mapRuntime != null) ? mapRuntime.getNumericalMean() : 0) + + ", redRuntime=" + + ((redRuntime != null) ? redRuntime.getNumericalMean() : 0) + + ", mtasks=" + ((mtasks != null) ? mtasks.getNumericalMean() : 0) + + ", rtasks=" + ((rtasks != null) ? rtasks.getNumericalMean() : 0) + + ", chance_of_reservation=" + jobClass.chance_of_reservation + "]\n"; + + } + + public double getClassWeight() { + return jobClass.class_weight; + } + + public long getDur() { + return genLongSample(dur); + } + + public int getMtasks() { + return genIntSample(mtasks); + } + + public int getRtasks() { + return genIntSample(rtasks); + } + + public long getMapMaxMemory() { + return genLongSample(mapMem); + } + + public long getReduceMaxMemory() { + return genLongSample(redMem); + } + + public long getMapMaxVcores() { + return genLongSample(mapVcores); + } + + public long getReduceMaxVcores() { + return genLongSample(redVcores); + } + + public SynthWorkload getWorkload() { + return workload; + } + + public int genIntSample(AbstractRealDistribution dist) { + if (dist == null) { + return 0; + } + double baseSample = dist.sample(); + if (baseSample < 0) { + baseSample = 0; + } + return (int) (Integer.MAX_VALUE & (long) Math.ceil(baseSample)); + } + + public long genLongSample(AbstractRealDistribution dist) { + return dist != null ? (long) Math.ceil(dist.sample()) : 0; + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof SynthJobClass)) { + return false; + } + SynthJobClass o = (SynthJobClass) other; + return workload.equals(o.workload); + } + + @Override + public int hashCode() { + return workload.hashCode() * workload.getId(); + } + + public String getClassName() { + return jobClass.class_name; + } + + public long getMapTimeSample() { + return genLongSample(mapRuntime); + } + + public long getReduceTimeSample() { + return genLongSample(redRuntime); + } + + public String getUserName() { + return jobClass.user_name; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a3929f2/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java new file mode 100644 index 0000000..14b0371 --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java @@ -0,0 +1,319 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.sls.synthetic; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.math3.random.JDKRandomGenerator; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.tools.rumen.JobStory; +import org.apache.hadoop.tools.rumen.JobStoryProducer; +import org.codehaus.jackson.annotate.JsonProperty; +import org.codehaus.jackson.map.ObjectMapper; + +import javax.xml.bind.annotation.XmlRootElement; +import java.io.IOException; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.codehaus.jackson.JsonParser.Feature.INTERN_FIELD_NAMES; +import static org.codehaus.jackson.map.DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES; + +/** + * This is a JobStoryProducer that operates from distribution of different + * workloads. The .json input file is used to determine how many jobs, which + * size, number of maps/reducers and their duration, as well as the temporal + * distributed of submissions. For each parameter we control avg and stdev, and + * generate values via normal or log-normal distributions. + */ +public class SynthTraceJobProducer implements JobStoryProducer { + + @SuppressWarnings("StaticVariableName") + private static final Log LOG = LogFactory.getLog(SynthTraceJobProducer.class); + + private final Configuration conf; + private final AtomicInteger numJobs; + private final Trace trace; + private final long seed; + + private int totalWeight; + private final List<Double> weightList; + private final Map<Integer, SynthWorkload> workloads; + + private final Queue<StoryParams> listStoryParams; + + private final JDKRandomGenerator rand; + + public static final String SLS_SYNTHETIC_TRACE_FILE = + "sls.synthetic" + ".trace_file"; + + public SynthTraceJobProducer(Configuration conf) throws IOException { + this(conf, new Path(conf.get(SLS_SYNTHETIC_TRACE_FILE))); + } + + public SynthTraceJobProducer(Configuration conf, Path path) + throws IOException { + + LOG.info("SynthTraceJobProducer"); + + this.conf = conf; + this.rand = new JDKRandomGenerator(); + workloads = new HashMap<Integer, SynthWorkload>(); + weightList = new ArrayList<Double>(); + + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(INTERN_FIELD_NAMES, true); + mapper.configure(FAIL_ON_UNKNOWN_PROPERTIES, false); + + FileSystem ifs = path.getFileSystem(conf); + FSDataInputStream fileIn = ifs.open(path); + + this.trace = mapper.readValue(fileIn, Trace.class); + seed = trace.rand_seed; + rand.setSeed(seed); + + this.numJobs = new AtomicInteger(trace.num_jobs); + + for (int workloadId = 0; workloadId < trace.workloads + .size(); workloadId++) { + SynthWorkload workload = new SynthWorkload(workloadId, trace); + for (int classId = + 0; classId < trace.workloads.get(workloadId).job_classes + .size(); classId++) { + SynthJobClass cls = new SynthJobClass(rand, trace, workload, classId); + workload.add(cls); + } + workloads.put(workloadId, workload); + } + + for (int i = 0; i < workloads.size(); i++) { + double w = workloads.get(i).getWorkloadWeight(); + totalWeight += w; + weightList.add(w); + } + + // create priority queue to keep start-time sorted + listStoryParams = + new PriorityQueue<>(10, new Comparator<StoryParams>() { + @Override + public int compare(StoryParams o1, StoryParams o2) { + long value = o2.actualSubmissionTime - o1.actualSubmissionTime; + if ((int)value != value) { + throw new ArithmeticException("integer overflow"); + } + return (int)value; + } + }); + + // initialize it + createStoryParams(); + LOG.info("Generated " + listStoryParams.size() + " deadlines for " + + this.numJobs.get() + " jobs "); + } + + public long getSeed() { + return seed; + } + + public int getNodesPerRack() { + return trace.nodes_per_rack < 1 ? 1: trace.nodes_per_rack; + } + + public int getNumNodes() { + return trace.num_nodes; + } + + /** + * Class used to parse a trace configuration file. + */ + @SuppressWarnings({ "membername", "checkstyle:visibilitymodifier" }) + @XmlRootElement + public static class Trace { + @JsonProperty("description") + String description; + @JsonProperty("num_nodes") + int num_nodes; + @JsonProperty("nodes_per_rack") + int nodes_per_rack; + @JsonProperty("num_jobs") + int num_jobs; + + // in sec (selects a portion of time_distribution + @JsonProperty("rand_seed") + long rand_seed; + @JsonProperty("workloads") + List<Workload> workloads; + + } + + /** + * Class used to parse a workload from file. + */ + @SuppressWarnings({ "membername", "checkstyle:visibilitymodifier" }) + public static class Workload { + @JsonProperty("workload_name") + String workload_name; + // used to change probability this workload is picked for each job + @JsonProperty("workload_weight") + double workload_weight; + @JsonProperty("queue_name") + String queue_name; + @JsonProperty("job_classes") + List<JobClass> job_classes; + @JsonProperty("time_distribution") + List<TimeSample> time_distribution; + } + + /** + * Class used to parse a job class from file. + */ + @SuppressWarnings({ "membername", "checkstyle:visibilitymodifier" }) + public static class JobClass { + + @JsonProperty("class_name") + String class_name; + @JsonProperty("user_name") + String user_name; + + // used to change probability this class is chosen + @JsonProperty("class_weight") + double class_weight; + + // reservation related params + @JsonProperty("chance_of_reservation") + double chance_of_reservation; + @JsonProperty("deadline_factor_avg") + double deadline_factor_avg; + @JsonProperty("deadline_factor_stddev") + double deadline_factor_stddev; + + // durations in sec + @JsonProperty("dur_avg") + double dur_avg; + @JsonProperty("dur_stddev") + double dur_stddev; + @JsonProperty("mtime_avg") + double mtime_avg; + @JsonProperty("mtime_stddev") + double mtime_stddev; + @JsonProperty("rtime_avg") + double rtime_avg; + @JsonProperty("rtime_stddev") + double rtime_stddev; + + // number of tasks + @JsonProperty("mtasks_avg") + double mtasks_avg; + @JsonProperty("mtasks_stddev") + double mtasks_stddev; + @JsonProperty("rtasks_avg") + double rtasks_avg; + @JsonProperty("rtasks_stddev") + double rtasks_stddev; + + // memory in MB + @JsonProperty("map_max_memory_avg") + long map_max_memory_avg; + @JsonProperty("map_max_memory_stddev") + double map_max_memory_stddev; + @JsonProperty("reduce_max_memory_avg") + long reduce_max_memory_avg; + @JsonProperty("reduce_max_memory_stddev") + double reduce_max_memory_stddev; + + // vcores + @JsonProperty("map_max_vcores_avg") + long map_max_vcores_avg; + @JsonProperty("map_max_vcores_stddev") + double map_max_vcores_stddev; + @JsonProperty("reduce_max_vcores_avg") + long reduce_max_vcores_avg; + @JsonProperty("reduce_max_vcores_stddev") + double reduce_max_vcores_stddev; + + } + + /** + * This is used to define time-varying probability of a job start-time (e.g., + * to simulate daily patterns). + */ + @SuppressWarnings({ "membername", "checkstyle:visibilitymodifier" }) + public static class TimeSample { + // in sec + @JsonProperty("time") + int time; + @JsonProperty("weight") + double jobs; + } + + static class StoryParams { + private SynthJobClass pickedJobClass; + private long actualSubmissionTime; + + StoryParams(SynthJobClass pickedJobClass, long actualSubmissionTime) { + this.pickedJobClass = pickedJobClass; + this.actualSubmissionTime = actualSubmissionTime; + } + } + + + void createStoryParams() { + + for (int i = 0; i < numJobs.get(); i++) { + int workload = SynthUtils.getWeighted(weightList, rand); + SynthWorkload pickedWorkload = workloads.get(workload); + long jobClass = + SynthUtils.getWeighted(pickedWorkload.getWeightList(), rand); + SynthJobClass pickedJobClass = + pickedWorkload.getClassList().get((int) jobClass); + long actualSubmissionTime = pickedWorkload.getBaseSubmissionTime(rand); + // long actualSubmissionTime = (i + 1) * 10; + listStoryParams + .add(new StoryParams(pickedJobClass, actualSubmissionTime)); + } + } + + @Override + public JobStory getNextJob() throws IOException { + if (numJobs.decrementAndGet() < 0) { + return null; + } + StoryParams storyParams = listStoryParams.poll(); + return storyParams.pickedJobClass.getJobStory(conf, + storyParams.actualSubmissionTime); + } + + @Override + public void close() { + } + + @Override + public String toString() { + return "SynthTraceJobProducer [ conf=" + conf + ", numJobs=" + numJobs + + ", weightList=" + weightList + ", r=" + rand + ", totalWeight=" + + totalWeight + ", workloads=" + workloads + "]"; + } + + public int getNumJobs() { + return trace.num_jobs; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a3929f2/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthUtils.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthUtils.java new file mode 100644 index 0000000..a7f8c7f --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthUtils.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.sls.synthetic; + +import org.apache.commons.math3.distribution.LogNormalDistribution; +import org.apache.commons.math3.distribution.NormalDistribution; +import org.apache.commons.math3.random.JDKRandomGenerator; + +import java.util.Collection; +import java.util.Random; + +/** + * Utils for the Synthetic generator. + */ +public final class SynthUtils { + + private SynthUtils(){ + //class is not meant to be instantiated + } + + public static int getWeighted(Collection<Double> weights, Random rr) { + + double totalWeight = 0; + for (Double i : weights) { + totalWeight += i; + } + + double rand = rr.nextDouble() * totalWeight; + + double cur = 0; + int ind = 0; + for (Double i : weights) { + cur += i; + if (cur > rand) { + break; + } + ind++; + } + + return ind; + } + + public static NormalDistribution getNormalDist(JDKRandomGenerator rand, + double average, double stdDev) { + + if (average <= 0) { + return null; + } + + // set default for missing param + if (stdDev == 0) { + stdDev = average / 6; + } + + NormalDistribution ret = new NormalDistribution(average, stdDev, + NormalDistribution.DEFAULT_INVERSE_ABSOLUTE_ACCURACY); + ret.reseedRandomGenerator(rand.nextLong()); + return ret; + } + + public static LogNormalDistribution getLogNormalDist(JDKRandomGenerator rand, + double mean, double stdDev) { + + if (mean <= 0) { + return null; + } + + // set default for missing param + if (stdDev == 0) { + stdDev = mean / 6; + } + + // derive lognormal parameters for X = LogNormal(mu, sigma) + // sigma^2 = ln (1+Var[X]/(E[X])^2) + // mu = ln(E[X]) - 1/2 * sigma^2 + double var = stdDev * stdDev; + double sigmasq = Math.log1p(var / (mean * mean)); + double sigma = Math.sqrt(sigmasq); + double mu = Math.log(mean) - 0.5 * sigmasq; + + LogNormalDistribution ret = new LogNormalDistribution(mu, sigma, + LogNormalDistribution.DEFAULT_INVERSE_ABSOLUTE_ACCURACY); + ret.reseedRandomGenerator(rand.nextLong()); + return ret; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a3929f2/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthWorkload.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthWorkload.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthWorkload.java new file mode 100644 index 0000000..9e5fd4e --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthWorkload.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.sls.synthetic; + +import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer.Trace; + +import java.util.*; + +/** + * This class represent a workload (made up of multiple SynthJobClass(es)). It + * also stores the temporal distributions of jobs in this workload. + */ +public class SynthWorkload { + + private final int id; + private final List<SynthJobClass> classList; + private final Trace trace; + private final SortedMap<Integer, Double> timeWeights; + + public SynthWorkload(int identifier, Trace inTrace) { + classList = new ArrayList<SynthJobClass>(); + this.id = identifier; + this.trace = inTrace; + timeWeights = new TreeMap<Integer, Double>(); + for (SynthTraceJobProducer.TimeSample ts : trace.workloads + .get(id).time_distribution) { + timeWeights.put(ts.time, ts.jobs); + } + } + + public boolean add(SynthJobClass s) { + return classList.add(s); + } + + public List<Double> getWeightList() { + ArrayList<Double> ret = new ArrayList<Double>(); + for (SynthJobClass s : classList) { + ret.add(s.getClassWeight()); + } + return ret; + } + + public int getId() { + return id; + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof SynthWorkload)) { + return false; + } + // assume ID determines job classes by construction + return getId() == ((SynthWorkload) other).getId(); + } + + @Override + public int hashCode() { + return getId(); + } + + @Override + public String toString() { + return "SynthWorkload " + trace.workloads.get(id).workload_name + "[\n" + + classList + "]\n"; + } + + public String getName() { + return trace.workloads.get(id).workload_name; + } + + public double getWorkloadWeight() { + return trace.workloads.get(id).workload_weight; + } + + public String getQueueName() { + return trace.workloads.get(id).queue_name; + } + + public long getBaseSubmissionTime(Random rand) { + + // pick based on weights the "bucket" for this start time + int position = SynthUtils.getWeighted(timeWeights.values(), rand); + + int[] time = new int[timeWeights.keySet().size()]; + int index = 0; + for (Integer i : timeWeights.keySet()) { + time[index++] = i; + } + + // uniformly pick a time between start and end time of this bucket + int startRange = time[position]; + int endRange = startRange; + // if there is no subsequent bucket pick startRange + if (position < timeWeights.keySet().size() - 1) { + endRange = time[position + 1]; + return startRange + rand.nextInt((endRange - startRange)); + } else { + return startRange; + } + } + + public List<SynthJobClass> getClassList() { + return classList; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a3929f2/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/package-info.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/package-info.java new file mode 100644 index 0000000..e069610 --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Classes comprising the synthetic load generator for SLS. + */ +package org.apache.hadoop.yarn.sls.synthetic; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a3929f2/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java index f1b4f07..dbc2dab 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java @@ -28,6 +28,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; @@ -37,12 +39,11 @@ import org.apache.hadoop.tools.rumen.JobTraceReader; import org.apache.hadoop.tools.rumen.LoggedJob; import org.apache.hadoop.tools.rumen.LoggedTask; import org.apache.hadoop.tools.rumen.LoggedTaskAttempt; -import org.codehaus.jackson.JsonFactory; -import org.codehaus.jackson.map.ObjectMapper; @Private @Unstable public class SLSUtils { + public final static String DEFAULT_JOB_TYPE = "mapreduce"; // hostname includes the network path and the host name. for example // "/default-rack/hostFoo" or "/coreSwitchA/TORSwitchB/hostBar". @@ -100,22 +101,15 @@ public class SLSUtils { */ public static Set<String> parseNodesFromSLSTrace(String jobTrace) throws IOException { - Set<String> nodeSet = new HashSet<String>(); + Set<String> nodeSet = new HashSet<>(); JsonFactory jsonF = new JsonFactory(); ObjectMapper mapper = new ObjectMapper(); Reader input = new InputStreamReader(new FileInputStream(jobTrace), "UTF-8"); try { - Iterator<Map> i = mapper.readValues( - jsonF.createJsonParser(input), Map.class); + Iterator<Map> i = mapper.readValues(jsonF.createParser(input), Map.class); while (i.hasNext()) { - Map jsonE = i.next(); - List tasks = (List) jsonE.get("job.tasks"); - for (Object o : tasks) { - Map jsonTask = (Map) o; - String hostname = jsonTask.get("container.host").toString(); - nodeSet.add(hostname); - } + addNodes(nodeSet, i.next()); } } finally { input.close(); @@ -123,6 +117,29 @@ public class SLSUtils { return nodeSet; } + private static void addNodes(Set<String> nodeSet, Map jsonEntry) { + if (jsonEntry.containsKey("num.nodes")) { + int numNodes = Integer.parseInt(jsonEntry.get("num.nodes").toString()); + int numRacks = 1; + if (jsonEntry.containsKey("num.racks")) { + numRacks = Integer.parseInt( + jsonEntry.get("num.racks").toString()); + } + nodeSet.addAll(generateNodes(numNodes, numRacks)); + } + + if (jsonEntry.containsKey("job.tasks")) { + List tasks = (List) jsonEntry.get("job.tasks"); + for (Object o : tasks) { + Map jsonTask = (Map) o; + String hostname = (String) jsonTask.get("container.host"); + if (hostname != null) { + nodeSet.add(hostname); + } + } + } + } + /** * parse the input node file, return each host name */ @@ -134,8 +151,7 @@ public class SLSUtils { Reader input = new InputStreamReader(new FileInputStream(nodeFile), "UTF-8"); try { - Iterator<Map> i = mapper.readValues( - jsonF.createJsonParser(input), Map.class); + Iterator<Map> i = mapper.readValues(jsonF.createParser(input), Map.class); while (i.hasNext()) { Map jsonE = i.next(); String rack = "/" + jsonE.get("rack"); @@ -150,4 +166,21 @@ public class SLSUtils { } return nodeSet; } + + public static Set<? extends String> generateNodes(int numNodes, + int numRacks){ + Set<String> nodeSet = new HashSet<>(); + if (numRacks < 1) { + numRacks = 1; + } + + if (numRacks > numNodes) { + numRacks = numNodes; + } + + for (int i = 0; i < numNodes; i++) { + nodeSet.add("/rack" + i % numRacks + "/node" + i); + } + return nodeSet; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org