http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java new file mode 100644 index 0000000..4c8d745 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java @@ -0,0 +1,784 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice; + +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.file.tfile.TFile; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationAttemptFinishDataProto; +import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationAttemptStartDataProto; +import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationFinishDataProto; +import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationStartDataProto; +import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ContainerFinishDataProto; +import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ContainerStartDataProto; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptFinishData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptStartData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationFinishData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationStartData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerFinishData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationAttemptFinishDataPBImpl; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationAttemptStartDataPBImpl; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationFinishDataPBImpl; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationStartDataPBImpl; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ContainerFinishDataPBImpl; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ContainerStartDataPBImpl; +import org.apache.hadoop.yarn.util.ConverterUtils; + +import com.google.protobuf.InvalidProtocolBufferException; + +/** + * File system implementation of {@link ApplicationHistoryStore}. In this + * implementation, one application will have just one file in the file system, + * which contains all the history data of one application, and its attempts and + * containers. {@link #applicationStarted(ApplicationStartData)} is supposed to + * be invoked first when writing any history data of one application and it will + * open a file, while {@link #applicationFinished(ApplicationFinishData)} is + * supposed to be last writing operation and will close the file. + */ +@Public +@Unstable +public class FileSystemApplicationHistoryStore extends AbstractService + implements ApplicationHistoryStore { + + private static final Log LOG = LogFactory + .getLog(FileSystemApplicationHistoryStore.class); + + private static final String ROOT_DIR_NAME = "ApplicationHistoryDataRoot"; + private static final int MIN_BLOCK_SIZE = 256 * 1024; + private static final String START_DATA_SUFFIX = "_start"; + private static final String FINISH_DATA_SUFFIX = "_finish"; + private static final FsPermission ROOT_DIR_UMASK = FsPermission + .createImmutable((short) 0740); + private static final FsPermission HISTORY_FILE_UMASK = FsPermission + .createImmutable((short) 0640); + + private FileSystem fs; + private Path rootDirPath; + + private ConcurrentMap<ApplicationId, HistoryFileWriter> outstandingWriters = + new ConcurrentHashMap<ApplicationId, HistoryFileWriter>(); + + public FileSystemApplicationHistoryStore() { + super(FileSystemApplicationHistoryStore.class.getName()); + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + Path fsWorkingPath = + new Path(conf.get(YarnConfiguration.FS_APPLICATION_HISTORY_STORE_URI)); + rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME); + try { + fs = fsWorkingPath.getFileSystem(conf); + fs.mkdirs(rootDirPath); + fs.setPermission(rootDirPath, ROOT_DIR_UMASK); + } catch (IOException e) { + LOG.error("Error when initializing FileSystemHistoryStorage", e); + throw e; + } + super.serviceInit(conf); + } + + @Override + public void serviceStop() throws Exception { + try { + for (Entry<ApplicationId, HistoryFileWriter> entry : outstandingWriters + .entrySet()) { + entry.getValue().close(); + } + outstandingWriters.clear(); + } finally { + IOUtils.cleanup(LOG, fs); + } + super.serviceStop(); + } + + @Override + public ApplicationHistoryData getApplication(ApplicationId appId) + throws IOException { + HistoryFileReader hfReader = getHistoryFileReader(appId); + try { + boolean readStartData = false; + boolean readFinishData = false; + ApplicationHistoryData historyData = + ApplicationHistoryData.newInstance(appId, null, null, null, null, + Long.MIN_VALUE, Long.MIN_VALUE, Long.MAX_VALUE, null, + FinalApplicationStatus.UNDEFINED, null); + while ((!readStartData || !readFinishData) && hfReader.hasNext()) { + HistoryFileReader.Entry entry = hfReader.next(); + if (entry.key.id.equals(appId.toString())) { + if (entry.key.suffix.equals(START_DATA_SUFFIX)) { + ApplicationStartData startData = + parseApplicationStartData(entry.value); + mergeApplicationHistoryData(historyData, startData); + readStartData = true; + } else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) { + ApplicationFinishData finishData = + parseApplicationFinishData(entry.value); + mergeApplicationHistoryData(historyData, finishData); + readFinishData = true; + } + } + } + if (!readStartData && !readFinishData) { + return null; + } + if (!readStartData) { + LOG.warn("Start information is missing for application " + appId); + } + if (!readFinishData) { + LOG.warn("Finish information is missing for application " + appId); + } + LOG.info("Completed reading history information of application " + appId); + return historyData; + } catch (IOException e) { + LOG.error("Error when reading history file of application " + appId); + throw e; + } finally { + hfReader.close(); + } + } + + @Override + public Map<ApplicationId, ApplicationHistoryData> getAllApplications() + throws IOException { + Map<ApplicationId, ApplicationHistoryData> historyDataMap = + new HashMap<ApplicationId, ApplicationHistoryData>(); + FileStatus[] files = fs.listStatus(rootDirPath); + for (FileStatus file : files) { + ApplicationId appId = + ConverterUtils.toApplicationId(file.getPath().getName()); + try { + ApplicationHistoryData historyData = getApplication(appId); + if (historyData != null) { + historyDataMap.put(appId, historyData); + } + } catch (IOException e) { + // Eat the exception not to disturb the getting the next + // ApplicationHistoryData + LOG.error("History information of application " + appId + + " is not included into the result due to the exception", e); + } + } + return historyDataMap; + } + + @Override + public Map<ApplicationAttemptId, ApplicationAttemptHistoryData> + getApplicationAttempts(ApplicationId appId) throws IOException { + Map<ApplicationAttemptId, ApplicationAttemptHistoryData> historyDataMap = + new HashMap<ApplicationAttemptId, ApplicationAttemptHistoryData>(); + HistoryFileReader hfReader = getHistoryFileReader(appId); + try { + while (hfReader.hasNext()) { + HistoryFileReader.Entry entry = hfReader.next(); + if (entry.key.id.startsWith( + ConverterUtils.APPLICATION_ATTEMPT_PREFIX)) { + ApplicationAttemptId appAttemptId = + ConverterUtils.toApplicationAttemptId(entry.key.id); + if (appAttemptId.getApplicationId().equals(appId)) { + ApplicationAttemptHistoryData historyData = + historyDataMap.get(appAttemptId); + if (historyData == null) { + historyData = ApplicationAttemptHistoryData.newInstance( + appAttemptId, null, -1, null, null, null, + FinalApplicationStatus.UNDEFINED, null); + historyDataMap.put(appAttemptId, historyData); + } + if (entry.key.suffix.equals(START_DATA_SUFFIX)) { + mergeApplicationAttemptHistoryData(historyData, + parseApplicationAttemptStartData(entry.value)); + } else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) { + mergeApplicationAttemptHistoryData(historyData, + parseApplicationAttemptFinishData(entry.value)); + } + } + } + } + LOG.info("Completed reading history information of all application" + + " attempts of application " + appId); + } catch (IOException e) { + LOG.info("Error when reading history information of some application" + + " attempts of application " + appId); + } finally { + hfReader.close(); + } + return historyDataMap; + } + + @Override + public ApplicationAttemptHistoryData getApplicationAttempt( + ApplicationAttemptId appAttemptId) throws IOException { + HistoryFileReader hfReader = + getHistoryFileReader(appAttemptId.getApplicationId()); + try { + boolean readStartData = false; + boolean readFinishData = false; + ApplicationAttemptHistoryData historyData = + ApplicationAttemptHistoryData.newInstance(appAttemptId, null, -1, + null, null, null, FinalApplicationStatus.UNDEFINED, null); + while ((!readStartData || !readFinishData) && hfReader.hasNext()) { + HistoryFileReader.Entry entry = hfReader.next(); + if (entry.key.id.equals(appAttemptId.toString())) { + if (entry.key.suffix.equals(START_DATA_SUFFIX)) { + ApplicationAttemptStartData startData = + parseApplicationAttemptStartData(entry.value); + mergeApplicationAttemptHistoryData(historyData, startData); + readStartData = true; + } else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) { + ApplicationAttemptFinishData finishData = + parseApplicationAttemptFinishData(entry.value); + mergeApplicationAttemptHistoryData(historyData, finishData); + readFinishData = true; + } + } + } + if (!readStartData && !readFinishData) { + return null; + } + if (!readStartData) { + LOG.warn("Start information is missing for application attempt " + + appAttemptId); + } + if (!readFinishData) { + LOG.warn("Finish information is missing for application attempt " + + appAttemptId); + } + LOG.info("Completed reading history information of application attempt " + + appAttemptId); + return historyData; + } catch (IOException e) { + LOG.error("Error when reading history file of application attempt" + + appAttemptId); + throw e; + } finally { + hfReader.close(); + } + } + + @Override + public ContainerHistoryData getContainer(ContainerId containerId) + throws IOException { + HistoryFileReader hfReader = + getHistoryFileReader(containerId.getApplicationAttemptId() + .getApplicationId()); + try { + boolean readStartData = false; + boolean readFinishData = false; + ContainerHistoryData historyData = + ContainerHistoryData + .newInstance(containerId, null, null, null, Long.MIN_VALUE, + Long.MAX_VALUE, null, Integer.MAX_VALUE, null); + while ((!readStartData || !readFinishData) && hfReader.hasNext()) { + HistoryFileReader.Entry entry = hfReader.next(); + if (entry.key.id.equals(containerId.toString())) { + if (entry.key.suffix.equals(START_DATA_SUFFIX)) { + ContainerStartData startData = parseContainerStartData(entry.value); + mergeContainerHistoryData(historyData, startData); + readStartData = true; + } else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) { + ContainerFinishData finishData = + parseContainerFinishData(entry.value); + mergeContainerHistoryData(historyData, finishData); + readFinishData = true; + } + } + } + if (!readStartData && !readFinishData) { + return null; + } + if (!readStartData) { + LOG.warn("Start information is missing for container " + containerId); + } + if (!readFinishData) { + LOG.warn("Finish information is missing for container " + containerId); + } + LOG.info("Completed reading history information of container " + + containerId); + return historyData; + } catch (IOException e) { + LOG.error("Error when reading history file of container " + containerId); + throw e; + } finally { + hfReader.close(); + } + } + + @Override + public ContainerHistoryData getAMContainer(ApplicationAttemptId appAttemptId) + throws IOException { + ApplicationAttemptHistoryData attemptHistoryData = + getApplicationAttempt(appAttemptId); + if (attemptHistoryData == null + || attemptHistoryData.getMasterContainerId() == null) { + return null; + } + return getContainer(attemptHistoryData.getMasterContainerId()); + } + + @Override + public Map<ContainerId, ContainerHistoryData> getContainers( + ApplicationAttemptId appAttemptId) throws IOException { + Map<ContainerId, ContainerHistoryData> historyDataMap = + new HashMap<ContainerId, ContainerHistoryData>(); + HistoryFileReader hfReader = + getHistoryFileReader(appAttemptId.getApplicationId()); + try { + while (hfReader.hasNext()) { + HistoryFileReader.Entry entry = hfReader.next(); + if (entry.key.id.startsWith(ConverterUtils.CONTAINER_PREFIX)) { + ContainerId containerId = + ConverterUtils.toContainerId(entry.key.id); + if (containerId.getApplicationAttemptId().equals(appAttemptId)) { + ContainerHistoryData historyData = + historyDataMap.get(containerId); + if (historyData == null) { + historyData = ContainerHistoryData.newInstance( + containerId, null, null, null, Long.MIN_VALUE, + Long.MAX_VALUE, null, Integer.MAX_VALUE, null); + historyDataMap.put(containerId, historyData); + } + if (entry.key.suffix.equals(START_DATA_SUFFIX)) { + mergeContainerHistoryData(historyData, + parseContainerStartData(entry.value)); + } else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) { + mergeContainerHistoryData(historyData, + parseContainerFinishData(entry.value)); + } + } + } + } + LOG.info("Completed reading history information of all conatiners" + + " of application attempt " + appAttemptId); + } catch (IOException e) { + LOG.info("Error when reading history information of some containers" + + " of application attempt " + appAttemptId); + } finally { + hfReader.close(); + } + return historyDataMap; + } + + @Override + public void applicationStarted(ApplicationStartData appStart) + throws IOException { + HistoryFileWriter hfWriter = + outstandingWriters.get(appStart.getApplicationId()); + if (hfWriter == null) { + Path applicationHistoryFile = + new Path(rootDirPath, appStart.getApplicationId().toString()); + try { + hfWriter = new HistoryFileWriter(applicationHistoryFile); + LOG.info("Opened history file of application " + + appStart.getApplicationId()); + } catch (IOException e) { + LOG.error("Error when openning history file of application " + + appStart.getApplicationId()); + throw e; + } + outstandingWriters.put(appStart.getApplicationId(), hfWriter); + } else { + throw new IOException("History file of application " + + appStart.getApplicationId() + " is already opened"); + } + assert appStart instanceof ApplicationStartDataPBImpl; + try { + hfWriter.writeHistoryData(new HistoryDataKey(appStart.getApplicationId() + .toString(), START_DATA_SUFFIX), + ((ApplicationStartDataPBImpl) appStart).getProto().toByteArray()); + LOG.info("Start information of application " + + appStart.getApplicationId() + " is written"); + } catch (IOException e) { + LOG.error("Error when writing start information of application " + + appStart.getApplicationId()); + throw e; + } + } + + @Override + public void applicationFinished(ApplicationFinishData appFinish) + throws IOException { + HistoryFileWriter hfWriter = + getHistoryFileWriter(appFinish.getApplicationId()); + assert appFinish instanceof ApplicationFinishDataPBImpl; + try { + hfWriter.writeHistoryData(new HistoryDataKey(appFinish.getApplicationId() + .toString(), FINISH_DATA_SUFFIX), + ((ApplicationFinishDataPBImpl) appFinish).getProto().toByteArray()); + LOG.info("Finish information of application " + + appFinish.getApplicationId() + " is written"); + } catch (IOException e) { + LOG.error("Error when writing finish information of application " + + appFinish.getApplicationId()); + throw e; + } finally { + hfWriter.close(); + outstandingWriters.remove(appFinish.getApplicationId()); + } + } + + @Override + public void applicationAttemptStarted( + ApplicationAttemptStartData appAttemptStart) throws IOException { + HistoryFileWriter hfWriter = + getHistoryFileWriter(appAttemptStart.getApplicationAttemptId() + .getApplicationId()); + assert appAttemptStart instanceof ApplicationAttemptStartDataPBImpl; + try { + hfWriter.writeHistoryData(new HistoryDataKey(appAttemptStart + .getApplicationAttemptId().toString(), START_DATA_SUFFIX), + ((ApplicationAttemptStartDataPBImpl) appAttemptStart).getProto() + .toByteArray()); + LOG.info("Start information of application attempt " + + appAttemptStart.getApplicationAttemptId() + " is written"); + } catch (IOException e) { + LOG.error("Error when writing start information of application attempt " + + appAttemptStart.getApplicationAttemptId()); + throw e; + } + } + + @Override + public void applicationAttemptFinished( + ApplicationAttemptFinishData appAttemptFinish) throws IOException { + HistoryFileWriter hfWriter = + getHistoryFileWriter(appAttemptFinish.getApplicationAttemptId() + .getApplicationId()); + assert appAttemptFinish instanceof ApplicationAttemptFinishDataPBImpl; + try { + hfWriter.writeHistoryData(new HistoryDataKey(appAttemptFinish + .getApplicationAttemptId().toString(), FINISH_DATA_SUFFIX), + ((ApplicationAttemptFinishDataPBImpl) appAttemptFinish).getProto() + .toByteArray()); + LOG.info("Finish information of application attempt " + + appAttemptFinish.getApplicationAttemptId() + " is written"); + } catch (IOException e) { + LOG.error("Error when writing finish information of application attempt " + + appAttemptFinish.getApplicationAttemptId()); + throw e; + } + } + + @Override + public void containerStarted(ContainerStartData containerStart) + throws IOException { + HistoryFileWriter hfWriter = + getHistoryFileWriter(containerStart.getContainerId() + .getApplicationAttemptId().getApplicationId()); + assert containerStart instanceof ContainerStartDataPBImpl; + try { + hfWriter.writeHistoryData(new HistoryDataKey(containerStart + .getContainerId().toString(), START_DATA_SUFFIX), + ((ContainerStartDataPBImpl) containerStart).getProto().toByteArray()); + LOG.info("Start information of container " + + containerStart.getContainerId() + " is written"); + } catch (IOException e) { + LOG.error("Error when writing start information of container " + + containerStart.getContainerId()); + throw e; + } + } + + @Override + public void containerFinished(ContainerFinishData containerFinish) + throws IOException { + HistoryFileWriter hfWriter = + getHistoryFileWriter(containerFinish.getContainerId() + .getApplicationAttemptId().getApplicationId()); + assert containerFinish instanceof ContainerFinishDataPBImpl; + try { + hfWriter.writeHistoryData(new HistoryDataKey(containerFinish + .getContainerId().toString(), FINISH_DATA_SUFFIX), + ((ContainerFinishDataPBImpl) containerFinish).getProto().toByteArray()); + LOG.info("Finish information of container " + + containerFinish.getContainerId() + " is written"); + } catch (IOException e) { + LOG.error("Error when writing finish information of container " + + containerFinish.getContainerId()); + } + } + + private static ApplicationStartData parseApplicationStartData(byte[] value) + throws InvalidProtocolBufferException { + return new ApplicationStartDataPBImpl( + ApplicationStartDataProto.parseFrom(value)); + } + + private static ApplicationFinishData parseApplicationFinishData(byte[] value) + throws InvalidProtocolBufferException { + return new ApplicationFinishDataPBImpl( + ApplicationFinishDataProto.parseFrom(value)); + } + + private static ApplicationAttemptStartData parseApplicationAttemptStartData( + byte[] value) throws InvalidProtocolBufferException { + return new ApplicationAttemptStartDataPBImpl( + ApplicationAttemptStartDataProto.parseFrom(value)); + } + + private static ApplicationAttemptFinishData + parseApplicationAttemptFinishData(byte[] value) + throws InvalidProtocolBufferException { + return new ApplicationAttemptFinishDataPBImpl( + ApplicationAttemptFinishDataProto.parseFrom(value)); + } + + private static ContainerStartData parseContainerStartData(byte[] value) + throws InvalidProtocolBufferException { + return new ContainerStartDataPBImpl( + ContainerStartDataProto.parseFrom(value)); + } + + private static ContainerFinishData parseContainerFinishData(byte[] value) + throws InvalidProtocolBufferException { + return new ContainerFinishDataPBImpl( + ContainerFinishDataProto.parseFrom(value)); + } + + private static void mergeApplicationHistoryData( + ApplicationHistoryData historyData, ApplicationStartData startData) { + historyData.setApplicationName(startData.getApplicationName()); + historyData.setApplicationType(startData.getApplicationType()); + historyData.setQueue(startData.getQueue()); + historyData.setUser(startData.getUser()); + historyData.setSubmitTime(startData.getSubmitTime()); + historyData.setStartTime(startData.getStartTime()); + } + + private static void mergeApplicationHistoryData( + ApplicationHistoryData historyData, ApplicationFinishData finishData) { + historyData.setFinishTime(finishData.getFinishTime()); + historyData.setDiagnosticsInfo(finishData.getDiagnosticsInfo()); + historyData.setFinalApplicationStatus(finishData + .getFinalApplicationStatus()); + historyData.setYarnApplicationState(finishData.getYarnApplicationState()); + } + + private static void mergeApplicationAttemptHistoryData( + ApplicationAttemptHistoryData historyData, + ApplicationAttemptStartData startData) { + historyData.setHost(startData.getHost()); + historyData.setRPCPort(startData.getRPCPort()); + historyData.setMasterContainerId(startData.getMasterContainerId()); + } + + private static void mergeApplicationAttemptHistoryData( + ApplicationAttemptHistoryData historyData, + ApplicationAttemptFinishData finishData) { + historyData.setDiagnosticsInfo(finishData.getDiagnosticsInfo()); + historyData.setTrackingURL(finishData.getTrackingURL()); + historyData.setFinalApplicationStatus(finishData + .getFinalApplicationStatus()); + historyData.setYarnApplicationAttemptState(finishData + .getYarnApplicationAttemptState()); + } + + private static void mergeContainerHistoryData( + ContainerHistoryData historyData, ContainerStartData startData) { + historyData.setAllocatedResource(startData.getAllocatedResource()); + historyData.setAssignedNode(startData.getAssignedNode()); + historyData.setPriority(startData.getPriority()); + historyData.setStartTime(startData.getStartTime()); + } + + private static void mergeContainerHistoryData( + ContainerHistoryData historyData, ContainerFinishData finishData) { + historyData.setFinishTime(finishData.getFinishTime()); + historyData.setDiagnosticsInfo(finishData.getDiagnosticsInfo()); + historyData.setContainerExitStatus(finishData.getContainerExitStatus()); + historyData.setContainerState(finishData.getContainerState()); + } + + private HistoryFileWriter getHistoryFileWriter(ApplicationId appId) + throws IOException { + HistoryFileWriter hfWriter = outstandingWriters.get(appId); + if (hfWriter == null) { + throw new IOException("History file of application " + appId + + " is not opened"); + } + return hfWriter; + } + + private HistoryFileReader getHistoryFileReader(ApplicationId appId) + throws IOException { + Path applicationHistoryFile = new Path(rootDirPath, appId.toString()); + if (!fs.exists(applicationHistoryFile)) { + throw new IOException("History file for application " + appId + + " is not found"); + } + // The history file is still under writing + if (outstandingWriters.containsKey(appId)) { + throw new IOException("History file for application " + appId + + " is under writing"); + } + return new HistoryFileReader(applicationHistoryFile); + } + + private class HistoryFileReader { + + private class Entry { + + private HistoryDataKey key; + private byte[] value; + + public Entry(HistoryDataKey key, byte[] value) { + this.key = key; + this.value = value; + } + } + + private TFile.Reader reader; + private TFile.Reader.Scanner scanner; + + public HistoryFileReader(Path historyFile) throws IOException { + FSDataInputStream fsdis = fs.open(historyFile); + reader = + new TFile.Reader(fsdis, fs.getFileStatus(historyFile).getLen(), + getConfig()); + reset(); + } + + public boolean hasNext() { + return !scanner.atEnd(); + } + + public Entry next() throws IOException { + TFile.Reader.Scanner.Entry entry = scanner.entry(); + DataInputStream dis = entry.getKeyStream(); + HistoryDataKey key = new HistoryDataKey(); + key.readFields(dis); + dis = entry.getValueStream(); + byte[] value = new byte[entry.getValueLength()]; + dis.read(value); + scanner.advance(); + return new Entry(key, value); + } + + public void reset() throws IOException { + IOUtils.cleanup(LOG, scanner); + scanner = reader.createScanner(); + } + + public void close() { + IOUtils.cleanup(LOG, scanner, reader); + } + + } + + private class HistoryFileWriter { + + private FSDataOutputStream fsdos; + private TFile.Writer writer; + + public HistoryFileWriter(Path historyFile) throws IOException { + if (fs.exists(historyFile)) { + fsdos = fs.append(historyFile); + } else { + fsdos = fs.create(historyFile); + } + fs.setPermission(historyFile, HISTORY_FILE_UMASK); + writer = + new TFile.Writer(fsdos, MIN_BLOCK_SIZE, getConfig().get( + YarnConfiguration.FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE, + YarnConfiguration.DEFAULT_FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE), null, + getConfig()); + } + + public synchronized void close() { + IOUtils.cleanup(LOG, writer, fsdos); + } + + public synchronized void writeHistoryData(HistoryDataKey key, byte[] value) + throws IOException { + DataOutputStream dos = null; + try { + dos = writer.prepareAppendKey(-1); + key.write(dos); + } finally { + IOUtils.cleanup(LOG, dos); + } + try { + dos = writer.prepareAppendValue(value.length); + dos.write(value); + } finally { + IOUtils.cleanup(LOG, dos); + } + } + + } + + private static class HistoryDataKey implements Writable { + + private String id; + + private String suffix; + + public HistoryDataKey() { + this(null, null); + } + + public HistoryDataKey(String id, String suffix) { + this.id = id; + this.suffix = suffix; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(id); + out.writeUTF(suffix); + } + + @Override + public void readFields(DataInput in) throws IOException { + id = in.readUTF(); + suffix = in.readUTF(); + } + } +}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/MemoryApplicationHistoryStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/MemoryApplicationHistoryStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/MemoryApplicationHistoryStore.java new file mode 100644 index 0000000..c226ad3 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/MemoryApplicationHistoryStore.java @@ -0,0 +1,274 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptFinishData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptStartData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationFinishData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationStartData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerFinishData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData; + +/** + * In-memory implementation of {@link ApplicationHistoryStore}. This + * implementation is for test purpose only. If users improperly instantiate it, + * they may encounter reading and writing history data in different memory + * store. + * + */ +@Private +@Unstable +public class MemoryApplicationHistoryStore extends AbstractService implements + ApplicationHistoryStore { + + private final ConcurrentMap<ApplicationId, ApplicationHistoryData> applicationData = + new ConcurrentHashMap<ApplicationId, ApplicationHistoryData>(); + private final ConcurrentMap<ApplicationId, ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData>> applicationAttemptData = + new ConcurrentHashMap<ApplicationId, ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData>>(); + private final ConcurrentMap<ApplicationAttemptId, ConcurrentMap<ContainerId, ContainerHistoryData>> containerData = + new ConcurrentHashMap<ApplicationAttemptId, ConcurrentMap<ContainerId, ContainerHistoryData>>(); + + public MemoryApplicationHistoryStore() { + super(MemoryApplicationHistoryStore.class.getName()); + } + + @Override + public Map<ApplicationId, ApplicationHistoryData> getAllApplications() { + return new HashMap<ApplicationId, ApplicationHistoryData>(applicationData); + } + + @Override + public ApplicationHistoryData getApplication(ApplicationId appId) { + return applicationData.get(appId); + } + + @Override + public Map<ApplicationAttemptId, ApplicationAttemptHistoryData> + getApplicationAttempts(ApplicationId appId) { + ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData> subMap = + applicationAttemptData.get(appId); + if (subMap == null) { + return Collections + .<ApplicationAttemptId, ApplicationAttemptHistoryData> emptyMap(); + } else { + return new HashMap<ApplicationAttemptId, ApplicationAttemptHistoryData>( + subMap); + } + } + + @Override + public ApplicationAttemptHistoryData getApplicationAttempt( + ApplicationAttemptId appAttemptId) { + ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData> subMap = + applicationAttemptData.get(appAttemptId.getApplicationId()); + if (subMap == null) { + return null; + } else { + return subMap.get(appAttemptId); + } + } + + @Override + public ContainerHistoryData getAMContainer(ApplicationAttemptId appAttemptId) { + ApplicationAttemptHistoryData appAttempt = + getApplicationAttempt(appAttemptId); + if (appAttempt == null || appAttempt.getMasterContainerId() == null) { + return null; + } else { + return getContainer(appAttempt.getMasterContainerId()); + } + } + + @Override + public ContainerHistoryData getContainer(ContainerId containerId) { + Map<ContainerId, ContainerHistoryData> subMap = + containerData.get(containerId.getApplicationAttemptId()); + if (subMap == null) { + return null; + } else { + return subMap.get(containerId); + } + } + + @Override + public Map<ContainerId, ContainerHistoryData> getContainers( + ApplicationAttemptId appAttemptId) throws IOException { + ConcurrentMap<ContainerId, ContainerHistoryData> subMap = + containerData.get(appAttemptId); + if (subMap == null) { + return Collections.<ContainerId, ContainerHistoryData> emptyMap(); + } else { + return new HashMap<ContainerId, ContainerHistoryData>(subMap); + } + } + + @Override + public void applicationStarted(ApplicationStartData appStart) + throws IOException { + ApplicationHistoryData oldData = + applicationData.putIfAbsent(appStart.getApplicationId(), + ApplicationHistoryData.newInstance(appStart.getApplicationId(), + appStart.getApplicationName(), appStart.getApplicationType(), + appStart.getQueue(), appStart.getUser(), appStart.getSubmitTime(), + appStart.getStartTime(), Long.MAX_VALUE, null, null, null)); + if (oldData != null) { + throw new IOException("The start information of application " + + appStart.getApplicationId() + " is already stored."); + } + } + + @Override + public void applicationFinished(ApplicationFinishData appFinish) + throws IOException { + ApplicationHistoryData data = + applicationData.get(appFinish.getApplicationId()); + if (data == null) { + throw new IOException("The finish information of application " + + appFinish.getApplicationId() + " is stored before the start" + + " information."); + } + // Make the assumption that YarnApplicationState should not be null if + // the finish information is already recorded + if (data.getYarnApplicationState() != null) { + throw new IOException("The finish information of application " + + appFinish.getApplicationId() + " is already stored."); + } + data.setFinishTime(appFinish.getFinishTime()); + data.setDiagnosticsInfo(appFinish.getDiagnosticsInfo()); + data.setFinalApplicationStatus(appFinish.getFinalApplicationStatus()); + data.setYarnApplicationState(appFinish.getYarnApplicationState()); + } + + @Override + public void applicationAttemptStarted( + ApplicationAttemptStartData appAttemptStart) throws IOException { + ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData> subMap = + getSubMap(appAttemptStart.getApplicationAttemptId().getApplicationId()); + ApplicationAttemptHistoryData oldData = + subMap.putIfAbsent(appAttemptStart.getApplicationAttemptId(), + ApplicationAttemptHistoryData.newInstance( + appAttemptStart.getApplicationAttemptId(), + appAttemptStart.getHost(), appAttemptStart.getRPCPort(), + appAttemptStart.getMasterContainerId(), null, null, null, null)); + if (oldData != null) { + throw new IOException("The start information of application attempt " + + appAttemptStart.getApplicationAttemptId() + " is already stored."); + } + } + + @Override + public void applicationAttemptFinished( + ApplicationAttemptFinishData appAttemptFinish) throws IOException { + ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData> subMap = + getSubMap(appAttemptFinish.getApplicationAttemptId().getApplicationId()); + ApplicationAttemptHistoryData data = + subMap.get(appAttemptFinish.getApplicationAttemptId()); + if (data == null) { + throw new IOException("The finish information of application attempt " + + appAttemptFinish.getApplicationAttemptId() + " is stored before" + + " the start information."); + } + // Make the assumption that YarnApplicationAttemptState should not be null + // if the finish information is already recorded + if (data.getYarnApplicationAttemptState() != null) { + throw new IOException("The finish information of application attempt " + + appAttemptFinish.getApplicationAttemptId() + " is already stored."); + } + data.setTrackingURL(appAttemptFinish.getTrackingURL()); + data.setDiagnosticsInfo(appAttemptFinish.getDiagnosticsInfo()); + data + .setFinalApplicationStatus(appAttemptFinish.getFinalApplicationStatus()); + data.setYarnApplicationAttemptState(appAttemptFinish + .getYarnApplicationAttemptState()); + } + + private ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData> + getSubMap(ApplicationId appId) { + applicationAttemptData + .putIfAbsent( + appId, + new ConcurrentHashMap<ApplicationAttemptId, ApplicationAttemptHistoryData>()); + return applicationAttemptData.get(appId); + } + + @Override + public void containerStarted(ContainerStartData containerStart) + throws IOException { + ConcurrentMap<ContainerId, ContainerHistoryData> subMap = + getSubMap(containerStart.getContainerId().getApplicationAttemptId()); + ContainerHistoryData oldData = + subMap.putIfAbsent(containerStart.getContainerId(), + ContainerHistoryData.newInstance(containerStart.getContainerId(), + containerStart.getAllocatedResource(), + containerStart.getAssignedNode(), containerStart.getPriority(), + containerStart.getStartTime(), Long.MAX_VALUE, null, + Integer.MAX_VALUE, null)); + if (oldData != null) { + throw new IOException("The start information of container " + + containerStart.getContainerId() + " is already stored."); + } + } + + @Override + public void containerFinished(ContainerFinishData containerFinish) + throws IOException { + ConcurrentMap<ContainerId, ContainerHistoryData> subMap = + getSubMap(containerFinish.getContainerId().getApplicationAttemptId()); + ContainerHistoryData data = subMap.get(containerFinish.getContainerId()); + if (data == null) { + throw new IOException("The finish information of container " + + containerFinish.getContainerId() + " is stored before" + + " the start information."); + } + // Make the assumption that ContainerState should not be null if + // the finish information is already recorded + if (data.getContainerState() != null) { + throw new IOException("The finish information of container " + + containerFinish.getContainerId() + " is already stored."); + } + data.setFinishTime(containerFinish.getFinishTime()); + data.setDiagnosticsInfo(containerFinish.getDiagnosticsInfo()); + data.setContainerExitStatus(containerFinish.getContainerExitStatus()); + data.setContainerState(containerFinish.getContainerState()); + } + + private ConcurrentMap<ContainerId, ContainerHistoryData> getSubMap( + ApplicationAttemptId appAttemptId) { + containerData.putIfAbsent(appAttemptId, + new ConcurrentHashMap<ContainerId, ContainerHistoryData>()); + return containerData.get(appAttemptId); + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/NullApplicationHistoryStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/NullApplicationHistoryStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/NullApplicationHistoryStore.java new file mode 100644 index 0000000..3660c10 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/NullApplicationHistoryStore.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptFinishData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptStartData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationFinishData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationStartData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerFinishData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData; + +/** + * Dummy implementation of {@link ApplicationHistoryStore}. If this + * implementation is used, no history data will be persisted. + * + */ +@Unstable +@Private +public class NullApplicationHistoryStore extends AbstractService implements + ApplicationHistoryStore { + + public NullApplicationHistoryStore() { + super(NullApplicationHistoryStore.class.getName()); + } + + @Override + public void applicationStarted(ApplicationStartData appStart) + throws IOException { + } + + @Override + public void applicationFinished(ApplicationFinishData appFinish) + throws IOException { + } + + @Override + public void applicationAttemptStarted( + ApplicationAttemptStartData appAttemptStart) throws IOException { + } + + @Override + public void applicationAttemptFinished( + ApplicationAttemptFinishData appAttemptFinish) throws IOException { + } + + @Override + public void containerStarted(ContainerStartData containerStart) + throws IOException { + } + + @Override + public void containerFinished(ContainerFinishData containerFinish) + throws IOException { + } + + @Override + public ApplicationHistoryData getApplication(ApplicationId appId) + throws IOException { + return null; + } + + @Override + public Map<ApplicationId, ApplicationHistoryData> getAllApplications() + throws IOException { + return Collections.emptyMap(); + } + + @Override + public Map<ApplicationAttemptId, ApplicationAttemptHistoryData> + getApplicationAttempts(ApplicationId appId) throws IOException { + return Collections.emptyMap(); + } + + @Override + public ApplicationAttemptHistoryData getApplicationAttempt( + ApplicationAttemptId appAttemptId) throws IOException { + return null; + } + + @Override + public ContainerHistoryData getContainer(ContainerId containerId) + throws IOException { + return null; + } + + @Override + public ContainerHistoryData getAMContainer(ApplicationAttemptId appAttemptId) + throws IOException { + return null; + } + + @Override + public Map<ContainerId, ContainerHistoryData> getContainers( + ApplicationAttemptId appAttemptId) throws IOException { + return Collections.emptyMap(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/LoadRunner.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/LoadRunner.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/LoadRunner.java new file mode 100644 index 0000000..7974a5f --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/LoadRunner.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .loadsimulator; + +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .loadsimulator.data.AppID; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .loadsimulator.data.ApplicationInstance; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .loadsimulator.data.HostMetricsGenerator; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .loadsimulator.data.MetricsGeneratorConfigurer; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .loadsimulator.net.MetricsSender; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .loadsimulator.net.RestMetricsSender; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .loadsimulator.net.StdOutMetricsSender; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .loadsimulator.util.TimeStampProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.List; +import java.util.concurrent.*; + +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .loadsimulator.data.AppID.MASTER_APPS; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .loadsimulator.data.AppID.SLAVE_APPS; + +/** + * + */ +public class LoadRunner { + private final static Logger LOG = LoggerFactory.getLogger(LoadRunner.class); + + private final ScheduledExecutorService timer; + private final ExecutorService workersPool; + private final Collection<Callable<String>> workers; + private final long startTime = new Date().getTime(); + private final int collectIntervalMillis; + private final int sendIntervalMillis; + + public LoadRunner(String hostName, + int threadCount, + String metricsHostName, + int collectIntervalMillis, + int sendIntervalMillis, + boolean createMaster) { + this.collectIntervalMillis = collectIntervalMillis; + this.workersPool = Executors.newFixedThreadPool(threadCount); + this.timer = Executors.newScheduledThreadPool(1); + this.sendIntervalMillis = sendIntervalMillis; + + workers = prepareWorkers(hostName, threadCount, metricsHostName, createMaster); + } + + private Collection<Callable<String>> prepareWorkers(String hostName, + int threadCount, + String metricsHost, + Boolean createMaster) { + Collection<Callable<String>> senderWorkers = + new ArrayList<Callable<String>>(threadCount); + + int startIndex = 0; + if (createMaster) { + String simHost = hostName + ".0"; + addMetricsWorkers(senderWorkers, simHost, metricsHost, MASTER_APPS); + startIndex++; + } + + for (int i = startIndex; i < threadCount; i++) { + String simHost = hostName + "." + i; + addMetricsWorkers(senderWorkers, simHost, metricsHost, SLAVE_APPS); + } + + return senderWorkers; + } + + private void addMetricsWorkers(Collection<Callable<String>> senderWorkers, + String specificHostName, + String metricsHostName, + AppID[] apps) { + for (AppID app : apps) { + HostMetricsGenerator metricsGenerator = + createApplicationMetrics(specificHostName, app); + MetricsSender sender = new RestMetricsSender(metricsHostName); + senderWorkers.add(new MetricsSenderWorker(sender, metricsGenerator)); + } + } + + private HostMetricsGenerator createApplicationMetrics(String simHost, AppID host) { + ApplicationInstance appInstance = new ApplicationInstance(simHost, host, ""); + TimeStampProvider timeStampProvider = new TimeStampProvider(startTime, + collectIntervalMillis, sendIntervalMillis); + + return MetricsGeneratorConfigurer + .createMetricsForHost(appInstance, timeStampProvider); + } + + public void start() { + timer.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + runOnce(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + }, 0, sendIntervalMillis, TimeUnit.MILLISECONDS); + } + + public void runOnce() throws InterruptedException { + List<Future<String>> futures = workersPool.invokeAll(workers, + sendIntervalMillis / 2, + TimeUnit.MILLISECONDS); + int done = 0; + + // TODO: correctly count the failed tasks + for (Future<String> future : futures) { + done += future.isDone() ? 1 : 0; + } + + LOG.info("Finished successfully " + done + " tasks "); + } + + public void shutdown() { + timer.shutdownNow(); + workersPool.shutdownNow(); + } + + public static void main(String[] args) { + LoadRunner runner = + new LoadRunner("local", 2, "metrics", 10000, 20000, false); + + runner.start(); + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/MetricsLoadSimulator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/MetricsLoadSimulator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/MetricsLoadSimulator.java new file mode 100644 index 0000000..a0c1bd2 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/MetricsLoadSimulator.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .loadsimulator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * Sample Usage: + * <pre> + * $ java -cp "dependency/*":LoadSimulator-1.0-SNAPSHOT.jar \ + * org.apache.ambari.metrics.MetricsLoadSimulator \ + * -h "bartosz.laptop" -n 2 -m "162.216.148.45" -c 10000 -s 30000</pre> + */ +public class MetricsLoadSimulator { + private final static Logger LOG = LoggerFactory.getLogger(MetricsLoadSimulator + .class); + + public static void main(String[] args) throws IOException, InterruptedException { + Map<String, String> mapArgs = parseArgs(args); + + LoadRunner loadRunner = new LoadRunner( + mapArgs.get("hostName"), + Integer.valueOf(mapArgs.get("numberOfHosts")), + mapArgs.get("metricsHostName"), + Integer.valueOf(mapArgs.get("collectInterval")), + Integer.valueOf(mapArgs.get("sendInterval")), + Boolean.valueOf(mapArgs.get("master")) + ); + + loadRunner.start(); + } + + private static Map<String, String> parseArgs(String[] args) { + Map<String, String> mapProps = new HashMap<String, String>(); + mapProps.put("hostName", "host"); + mapProps.put("numberOfHosts", "20"); + mapProps.put("trafficType", "burst"); + mapProps.put("metricsHostName", "localhost"); + mapProps.put("collectInterval", "10000"); + mapProps.put("sendInterval", "60000"); + + if (args.length == 0) { + printUsage(); + throw new RuntimeException("Unexpected argument, See usage message."); + } else { + for (int i = 0; i < args.length; i += 2) { + String arg = args[i]; + if (arg.equals("-h")) { + mapProps.put("hostName", args[i + 1]); + } else if (arg.equals("-n")) { + mapProps.put("numberOfHosts", args[i + 1]); + } else if (arg.equals("-t")) { + mapProps.put("trafficType", args[i + 1]); + } else if (arg.equals("-m")) { + mapProps.put("metricsHostName", args[i + 1]); + } else if (arg.equals("-c")) { + mapProps.put("collectInterval", args[i + 1]); + } else if (arg.equals("-s")) { + mapProps.put("sendInterval", args[i + 1]); + } else if (arg.equals("-M")) { + mapProps.put("master", args[i + 1]); + } else if (arg.equals("-d")) { + // a dummy switch - it says that we agree with defaults + } else { + printUsage(); + throw new RuntimeException("Unexpected argument, See usage message."); + } + } + } + + LOG.info("Recognized options: baseHostName={} hosts#={} trafficMode={} " + + "metricsHostName={} collectIntervalMillis={} sendIntervalMillis={} " + + "simulateMaster={}", + mapProps.get("hostName"), + Integer.valueOf(mapProps.get("numberOfHosts")), + mapProps.get("trafficType"), + mapProps.get("metricsHostName"), + Integer.valueOf(mapProps.get("collectInterval")), + Integer.valueOf(mapProps.get("sendInterval")), + Boolean.valueOf(mapProps.get("master")) + ); + + return mapProps; + } + + public static void printUsage() { + System.err.println("Usage: java MetricsLoadSimulator [OPTIONS]"); + System.err.println("Options: "); + System.err.println("[-h hostName] [-n numberOfHosts] " + + "[-t trafficMode {burst, staggered}] [-m metricsHostName] " + + "[-c collectIntervalMillis {10 sec}] [-s sendIntervalMillis {60 sec}]" + + "[-M simulateMaster {true, false}] "); + System.err.println(); + System.err.println("When you select a master, then one simulated host will play"); + System.err.println("a role of a master, and the rest will be slaves. Otherwise"); + System.err.println("all simulation threads (single thread is for single host)"); + System.err.println("will be slave hosts"); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/MetricsSenderWorker.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/MetricsSenderWorker.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/MetricsSenderWorker.java new file mode 100644 index 0000000..c027933 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/MetricsSenderWorker.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.loadsimulator; + + +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .loadsimulator.data.AppMetrics; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .loadsimulator.data.HostMetricsGenerator; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .loadsimulator.net.MetricsSender; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .loadsimulator.net.RestMetricsSender; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .loadsimulator.util.Json; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.Callable; + +/** + */ +public class MetricsSenderWorker implements Callable<String> { + private final static Logger LOG = LoggerFactory.getLogger(RestMetricsSender.class); + + MetricsSender sender; + HostMetricsGenerator hmg; + + public MetricsSenderWorker(MetricsSender sender, HostMetricsGenerator metricsGenerator) { + this.sender = sender; + hmg = metricsGenerator; + } + + @Override + public String call() throws Exception { + AppMetrics hostMetrics = hmg.createMetrics(); + + try { + String request = new Json().serialize(hostMetrics); + String response = sender.pushMetrics(request); + + return response; + } catch (IOException e) { + LOG.error("Error while pushing metrics: ", e); + throw e; + } + + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/AppID.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/AppID.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/AppID.java new file mode 100644 index 0000000..4f58dc5 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/AppID.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .loadsimulator.data; + +public enum AppID { + HOST("HOST"), + NAMENODE("namenode"), + RESOURCEMANAGER("resourcemanager"), + DATANODE("datanode"), + NODEMANAGER("nodemanager"), + MASTER_HBASE("hbase"), + SLAVE_HBASE("hbase"); + + public static final AppID[] MASTER_APPS = {HOST, NAMENODE, RESOURCEMANAGER, MASTER_HBASE}; + public static final AppID[] SLAVE_APPS = {HOST, DATANODE, NODEMANAGER, SLAVE_HBASE}; + + private String id; + + private AppID(String id) { + this.id = id; + } + + public String getId() { + return id; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/AppMetrics.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/AppMetrics.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/AppMetrics.java new file mode 100644 index 0000000..d9cec2b --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/AppMetrics.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .loadsimulator.data; + +import java.util.ArrayList; +import java.util.Collection; + +/** + * AppMetrics is a class that helps to create properly initialized metrics for + * current app. It also holds the + * metrics and can be serialized to json. + */ +public class AppMetrics { + + private final Collection<Metric> metrics = new ArrayList<Metric>(); + private final transient ApplicationInstance applicationId; + private final transient long startTime; + + public AppMetrics(ApplicationInstance applicationId, long startTime) { + this.applicationId = applicationId; + this.startTime = startTime; + } + + public Metric createMetric(String metricName) { + return new Metric(applicationId, metricName, startTime); + } + + public void addMetric(Metric metric) { + metrics.add(metric); + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/ApplicationInstance.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/ApplicationInstance.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/ApplicationInstance.java new file mode 100644 index 0000000..d99ecc9 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/ApplicationInstance.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .loadsimulator.data; + +/** + * AppId is a helper class that encapsulates the common part of metrics message. + * It contains hostName, appId and instanceId. It is immutable, + * and it can not hold null values. + */ +public final class ApplicationInstance { + + private final transient String hostName; + private final transient AppID appId; + private final transient String instanceId; + + /** + * @param hostname + * @param appId + * @param instanceId + */ + public ApplicationInstance(String hostname, AppID appId, String instanceId) { + if (hostname == null || appId == null || instanceId == null) + throw new IllegalArgumentException("ApplicationInstance can not be " + + "instantiated with null values"); + + this.hostName = hostname; + this.appId = appId; + this.instanceId = instanceId; + } + + public String getInstanceId() { + return instanceId; + } + + public AppID getAppId() { + return appId; + } + + public String getHostName() { + return hostName; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/HostMetricsGenerator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/HostMetricsGenerator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/HostMetricsGenerator.java new file mode 100644 index 0000000..61a6624 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/HostMetricsGenerator.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.loadsimulator.data; + + +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .loadsimulator.util.RandomMetricsProvider; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .loadsimulator.util.TimeStampProvider; + +import java.util.HashMap; +import java.util.Map; + +/** + */ +public class HostMetricsGenerator { + + private Map<String, RandomMetricsProvider> metricDataProviders = new HashMap<String, RandomMetricsProvider>(); + private final TimeStampProvider tsp; + private final ApplicationInstance id; + + public HostMetricsGenerator(ApplicationInstance id, + TimeStampProvider timeStamps, + Map<String, RandomMetricsProvider> metricDataProviders) { + this.id = id; + this.tsp = timeStamps; + this.metricDataProviders = metricDataProviders; + } + + public AppMetrics createMetrics() { + long[] timestamps = tsp.timestampsForNextInterval(); + AppMetrics appMetrics = new AppMetrics(id, timestamps[0]); + + for (Map.Entry<String, RandomMetricsProvider> entry : metricDataProviders.entrySet()) { + String metricName = entry.getKey(); + RandomMetricsProvider metricData = entry.getValue(); + + Metric metric = appMetrics.createMetric(metricName); + for (long timestamp : timestamps) { + metric.putMetric(timestamp, String.valueOf(metricData.next())); + } + appMetrics.addMetric(metric); + } + + return appMetrics; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/Metric.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/Metric.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/Metric.java new file mode 100644 index 0000000..f274263 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/Metric.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.loadsimulator.data; + +import java.util.LinkedHashMap; +import java.util.Map; + +public class Metric { + + private String instanceid; + private String hostname; + private Map<String, String> metrics = new LinkedHashMap<String, String>(); + private String starttime; + private String appid; + private String metricname; + + // i don't like this ctor, but it has to be public for json deserialization + public Metric() { + } + + public Metric(ApplicationInstance app, String metricName, long startTime) { + this.hostname = app.getHostName(); + this.appid = app.getAppId().getId(); + this.instanceid = app.getInstanceId(); + this.metricname = metricName; + this.starttime = Long.toString(startTime); + } + + public void putMetric(long timestamp, String value) { + metrics.put(Long.toString(timestamp), value); + } + + public String getInstanceid() { + return instanceid; + } + + public String getHostname() { + return hostname; + } + + public Map<String, String> getMetrics() { + return metrics; + } + + public String getStarttime() { + return starttime; + } + + public String getAppid() { + return appid; + } + + public String getMetricname() { + return metricname; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/MetricsGeneratorConfigurer.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/MetricsGeneratorConfigurer.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/MetricsGeneratorConfigurer.java new file mode 100644 index 0000000..b3401b2 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/data/MetricsGeneratorConfigurer.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.loadsimulator.data; + + +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .loadsimulator.util.RandomMetricsProvider; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .loadsimulator.util.TimeStampProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.HashMap; +import java.util.Map; + +/** + * MetricsGeneratorConfigurer is a factory that reads metrics definition from a file, + * and returns an Single HostMetricsGenerator. Check createMetricsForHost method + * for details. + */ +public class MetricsGeneratorConfigurer { + + private final static Logger LOG = LoggerFactory.getLogger + (MetricsGeneratorConfigurer.class); + + /** + * Creates HostMetricsGenerator configured with metric names loaded from file. + * + * @param id ApplicationInstance descriptor, will be used to create + * HostMetricsGenerator, cannot be null + * @param timeStamps configured TimeStampProvider that can provide next + * timestamp, cannot be null + * @return HostMetricsGenerator with given ApplicationInstance id and configured + * mapping of + * metric names to data providers + */ + public static HostMetricsGenerator createMetricsForHost( + ApplicationInstance id, + TimeStampProvider timeStamps) { + return new HostMetricsGenerator(id, timeStamps, readMetrics(id.getAppId())); + } + + private static Map<String, RandomMetricsProvider> readMetrics(AppID type) { + InputStream input = null; + Map<String, RandomMetricsProvider> metrics = + new HashMap<String, RandomMetricsProvider>(); + String fileName = "metrics_def/" + type.toString() + ".dat"; + + try { + LOG.info("Loading " + fileName); + + input = MetricsGeneratorConfigurer.class.getClassLoader() + .getResourceAsStream(fileName); + + BufferedReader reader = new BufferedReader(new InputStreamReader(input)); + + String line; + while ((line = reader.readLine()) != null) { + metrics.put(line.trim(), new RandomMetricsProvider(100, 200)); + } + + } catch (IOException e) { + LOG.error("Cannot read file " + fileName + " for appID " + type.toString(), e); + } finally { + if (input != null) { + try { + input.close(); + } catch (IOException ex) { + // intentionally left blank, here we cannot do anything + } + } + } + + return metrics; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/MetricsSender.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/MetricsSender.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/MetricsSender.java new file mode 100644 index 0000000..35c0fc3 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/MetricsSender.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.loadsimulator.net; + +/** + * MetricSender should provides a simple way of pushing metrics to some service. + */ +public interface MetricsSender { + /** + * Push metrics to the metric service (e.g. a metrics storage system). + * + * @param payload the payload to be sent to metrics service + * @return response message either acknowledgement or error + */ + String pushMetrics(String payload); +}