http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryReader.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryReader.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryReader.java new file mode 100644 index 0000000..590853a --- /dev/null +++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryReader.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice; + +import java.io.IOException; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData; + +@InterfaceAudience.Public +@InterfaceStability.Unstable +public interface ApplicationHistoryReader { + + /** + * This method returns Application {@link ApplicationHistoryData} for the + * specified {@link ApplicationId}. + * + * @param appId + * + * @return {@link ApplicationHistoryData} for the ApplicationId. + * @throws IOException + */ + ApplicationHistoryData getApplication(ApplicationId appId) throws IOException; + + /** + * This method returns all Application {@link ApplicationHistoryData}s + * + * @return map of {@link ApplicationId} to {@link ApplicationHistoryData}s. + * @throws IOException + */ + Map<ApplicationId, ApplicationHistoryData> getAllApplications() + throws IOException; + + /** + * Application can have multiple application attempts + * {@link ApplicationAttemptHistoryData}. This method returns the all + * {@link ApplicationAttemptHistoryData}s for the Application. + * + * @param appId + * + * @return all {@link ApplicationAttemptHistoryData}s for the Application. + * @throws IOException + */ + Map<ApplicationAttemptId, ApplicationAttemptHistoryData> + getApplicationAttempts(ApplicationId appId) throws IOException; + + /** + * This method returns {@link ApplicationAttemptHistoryData} for specified + * {@link ApplicationId}. + * + * @param appAttemptId + * {@link ApplicationAttemptId} + * @return {@link ApplicationAttemptHistoryData} for ApplicationAttemptId + * @throws IOException + */ + ApplicationAttemptHistoryData getApplicationAttempt( + ApplicationAttemptId appAttemptId) throws IOException; + + /** + * This method returns {@link ContainerHistoryData} for specified + * {@link ContainerId}. + * + * @param containerId + * {@link ContainerId} + * @return {@link ContainerHistoryData} for ContainerId + * @throws IOException + */ + ContainerHistoryData getContainer(ContainerId containerId) throws IOException; + + /** + * This method returns {@link ContainerHistoryData} for specified + * {@link ApplicationAttemptId}. + * + * @param appAttemptId + * {@link ApplicationAttemptId} + * @return {@link ContainerHistoryData} for ApplicationAttemptId + * @throws IOException + */ + ContainerHistoryData getAMContainer(ApplicationAttemptId appAttemptId) + throws IOException; + + /** + * This method returns Map{@link ContainerId} to {@link ContainerHistoryData} + * for specified {@link ApplicationAttemptId}. + * + * @param appAttemptId + * {@link ApplicationAttemptId} + * @return Map{@link ContainerId} to {@link ContainerHistoryData} for + * ApplicationAttemptId + * @throws IOException + */ + Map<ContainerId, ContainerHistoryData> getContainers( + ApplicationAttemptId appAttemptId) throws IOException; +}
http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java new file mode 100644 index 0000000..f622153 --- /dev/null +++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.source.JvmMetrics; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.util.ExitUtil; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.ShutdownHookManager; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.HBaseTimelineMetricStore; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricStore; +import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineStore; +import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.LeveldbTimelineStore; +import org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.AHSWebApp; +import org.apache.hadoop.yarn.webapp.WebApp; +import org.apache.hadoop.yarn.webapp.WebApps; +import org.apache.hadoop.yarn.webapp.util.WebAppUtils; + +import com.google.common.annotations.VisibleForTesting; + +/** + * History server that keeps track of all types of history in the cluster. + * Application specific history to start with. + */ +public class ApplicationHistoryServer extends CompositeService { + + public static final int SHUTDOWN_HOOK_PRIORITY = 30; + private static final Log LOG = LogFactory + .getLog(ApplicationHistoryServer.class); + + ApplicationHistoryClientService ahsClientService; + ApplicationHistoryManager historyManager; + TimelineStore timelineStore; + TimelineMetricStore timelineMetricStore; + private WebApp webApp; + + public ApplicationHistoryServer() { + super(ApplicationHistoryServer.class.getName()); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + historyManager = createApplicationHistory(); + ahsClientService = createApplicationHistoryClientService(historyManager); + addService(ahsClientService); + addService((Service) historyManager); + timelineStore = createTimelineStore(conf); + timelineMetricStore = createTimelineMetricStore(conf); + addIfService(timelineStore); + addIfService(timelineMetricStore); + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + DefaultMetricsSystem.initialize("ApplicationHistoryServer"); + JvmMetrics.initSingleton("ApplicationHistoryServer", null); + + startWebApp(); + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + if (webApp != null) { + webApp.stop(); + } + + DefaultMetricsSystem.shutdown(); + super.serviceStop(); + } + + @Private + @VisibleForTesting + public ApplicationHistoryClientService getClientService() { + return this.ahsClientService; + } + + protected ApplicationHistoryClientService + createApplicationHistoryClientService( + ApplicationHistoryManager historyManager) { + return new ApplicationHistoryClientService(historyManager); + } + + protected ApplicationHistoryManager createApplicationHistory() { + return new ApplicationHistoryManagerImpl(); + } + + protected ApplicationHistoryManager getApplicationHistory() { + return this.historyManager; + } + + static ApplicationHistoryServer launchAppHistoryServer(String[] args) { + Thread + .setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); + StringUtils.startupShutdownMessage(ApplicationHistoryServer.class, args, + LOG); + ApplicationHistoryServer appHistoryServer = null; + try { + appHistoryServer = new ApplicationHistoryServer(); + ShutdownHookManager.get().addShutdownHook( + new CompositeServiceShutdownHook(appHistoryServer), + SHUTDOWN_HOOK_PRIORITY); + YarnConfiguration conf = new YarnConfiguration(); + appHistoryServer.init(conf); + appHistoryServer.start(); + } catch (Throwable t) { + LOG.fatal("Error starting ApplicationHistoryServer", t); + ExitUtil.terminate(-1, "Error starting ApplicationHistoryServer"); + } + return appHistoryServer; + } + + public static void main(String[] args) { + launchAppHistoryServer(args); + } + + protected ApplicationHistoryManager createApplicationHistoryManager( + Configuration conf) { + return new ApplicationHistoryManagerImpl(); + } + + protected TimelineStore createTimelineStore( + Configuration conf) { + return ReflectionUtils.newInstance(conf.getClass( + YarnConfiguration.TIMELINE_SERVICE_STORE, LeveldbTimelineStore.class, + TimelineStore.class), conf); + } + + protected TimelineMetricStore createTimelineMetricStore(Configuration conf) { + LOG.info("Creating metrics store."); + return ReflectionUtils.newInstance(HBaseTimelineMetricStore.class, conf); + } + + protected void startWebApp() { + String bindAddress = WebAppUtils.getAHSWebAppURLWithoutScheme(getConfig()); + LOG.info("Instantiating AHSWebApp at " + bindAddress); + try { + webApp = + WebApps + .$for("applicationhistory", ApplicationHistoryClientService.class, + ahsClientService, "ws") + .with(getConfig()) + .withHttpSpnegoPrincipalKey( + YarnConfiguration.TIMELINE_SERVICE_WEBAPP_SPNEGO_USER_NAME_KEY) + .withHttpSpnegoKeytabKey( + YarnConfiguration.TIMELINE_SERVICE_WEBAPP_SPNEGO_KEYTAB_FILE_KEY) + .at(bindAddress) + .start(new AHSWebApp(historyManager, timelineStore, timelineMetricStore)); + } catch (Exception e) { + String msg = "AHSWebApp failed to start."; + LOG.error(msg, e); + throw new YarnRuntimeException(msg, e); + } + } + /** + * @return ApplicationTimelineStore + */ + @Private + @VisibleForTesting + public TimelineStore getTimelineStore() { + return timelineStore; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryStore.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryStore.java new file mode 100644 index 0000000..c26faef --- /dev/null +++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryStore.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.service.Service; + +/** + * This class is the abstract of the storage of the application history data. It + * is a {@link Service}, such that the implementation of this class can make use + * of the service life cycle to initialize and cleanup the storage. Users can + * access the storage via {@link ApplicationHistoryReader} and + * {@link ApplicationHistoryWriter} interfaces. + * + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public interface ApplicationHistoryStore extends Service, + ApplicationHistoryReader, ApplicationHistoryWriter { +} http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryWriter.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryWriter.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryWriter.java new file mode 100644 index 0000000..09ba36d --- /dev/null +++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryWriter.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptFinishData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptStartData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationFinishData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationStartData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerFinishData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData; + +/** + * It is the interface of writing the application history, exposing the methods + * of writing {@link ApplicationStartData}, {@link ApplicationFinishData} + * {@link ApplicationAttemptStartData}, {@link ApplicationAttemptFinishData}, + * {@link ContainerStartData} and {@link ContainerFinishData}. + */ +@Private +@Unstable +public interface ApplicationHistoryWriter { + + /** + * This method writes the information of <code>RMApp</code> that is available + * when it starts. + * + * @param appStart + * the record of the information of <code>RMApp</code> that is + * available when it starts + * @throws IOException + */ + void applicationStarted(ApplicationStartData appStart) throws IOException; + + /** + * This method writes the information of <code>RMApp</code> that is available + * when it finishes. + * + * @param appFinish + * the record of the information of <code>RMApp</code> that is + * available when it finishes + * @throws IOException + */ + void applicationFinished(ApplicationFinishData appFinish) throws IOException; + + /** + * This method writes the information of <code>RMAppAttempt</code> that is + * available when it starts. + * + * @param appAttemptStart + * the record of the information of <code>RMAppAttempt</code> that is + * available when it starts + * @throws IOException + */ + void applicationAttemptStarted(ApplicationAttemptStartData appAttemptStart) + throws IOException; + + /** + * This method writes the information of <code>RMAppAttempt</code> that is + * available when it finishes. + * + * @param appAttemptFinish + * the record of the information of <code>RMAppAttempt</code> that is + * available when it finishes + * @throws IOException + */ + void + applicationAttemptFinished(ApplicationAttemptFinishData appAttemptFinish) + throws IOException; + + /** + * This method writes the information of <code>RMContainer</code> that is + * available when it starts. + * + * @param containerStart + * the record of the information of <code>RMContainer</code> that is + * available when it starts + * @throws IOException + */ + void containerStarted(ContainerStartData containerStart) throws IOException; + + /** + * This method writes the information of <code>RMContainer</code> that is + * available when it finishes. + * + * @param containerFinish + * the record of the information of <code>RMContainer</code> that is + * available when it finishes + * @throws IOException + */ + void containerFinished(ContainerFinishData containerFinish) + throws IOException; + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java new file mode 100644 index 0000000..4c8d745 --- /dev/null +++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/FileSystemApplicationHistoryStore.java @@ -0,0 +1,784 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice; + +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.file.tfile.TFile; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationAttemptFinishDataProto; +import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationAttemptStartDataProto; +import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationFinishDataProto; +import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ApplicationStartDataProto; +import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ContainerFinishDataProto; +import org.apache.hadoop.yarn.proto.ApplicationHistoryServerProtos.ContainerStartDataProto; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptFinishData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptStartData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationFinishData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationStartData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerFinishData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationAttemptFinishDataPBImpl; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationAttemptStartDataPBImpl; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationFinishDataPBImpl; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ApplicationStartDataPBImpl; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ContainerFinishDataPBImpl; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.impl.pb.ContainerStartDataPBImpl; +import org.apache.hadoop.yarn.util.ConverterUtils; + +import com.google.protobuf.InvalidProtocolBufferException; + +/** + * File system implementation of {@link ApplicationHistoryStore}. In this + * implementation, one application will have just one file in the file system, + * which contains all the history data of one application, and its attempts and + * containers. {@link #applicationStarted(ApplicationStartData)} is supposed to + * be invoked first when writing any history data of one application and it will + * open a file, while {@link #applicationFinished(ApplicationFinishData)} is + * supposed to be last writing operation and will close the file. + */ +@Public +@Unstable +public class FileSystemApplicationHistoryStore extends AbstractService + implements ApplicationHistoryStore { + + private static final Log LOG = LogFactory + .getLog(FileSystemApplicationHistoryStore.class); + + private static final String ROOT_DIR_NAME = "ApplicationHistoryDataRoot"; + private static final int MIN_BLOCK_SIZE = 256 * 1024; + private static final String START_DATA_SUFFIX = "_start"; + private static final String FINISH_DATA_SUFFIX = "_finish"; + private static final FsPermission ROOT_DIR_UMASK = FsPermission + .createImmutable((short) 0740); + private static final FsPermission HISTORY_FILE_UMASK = FsPermission + .createImmutable((short) 0640); + + private FileSystem fs; + private Path rootDirPath; + + private ConcurrentMap<ApplicationId, HistoryFileWriter> outstandingWriters = + new ConcurrentHashMap<ApplicationId, HistoryFileWriter>(); + + public FileSystemApplicationHistoryStore() { + super(FileSystemApplicationHistoryStore.class.getName()); + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + Path fsWorkingPath = + new Path(conf.get(YarnConfiguration.FS_APPLICATION_HISTORY_STORE_URI)); + rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME); + try { + fs = fsWorkingPath.getFileSystem(conf); + fs.mkdirs(rootDirPath); + fs.setPermission(rootDirPath, ROOT_DIR_UMASK); + } catch (IOException e) { + LOG.error("Error when initializing FileSystemHistoryStorage", e); + throw e; + } + super.serviceInit(conf); + } + + @Override + public void serviceStop() throws Exception { + try { + for (Entry<ApplicationId, HistoryFileWriter> entry : outstandingWriters + .entrySet()) { + entry.getValue().close(); + } + outstandingWriters.clear(); + } finally { + IOUtils.cleanup(LOG, fs); + } + super.serviceStop(); + } + + @Override + public ApplicationHistoryData getApplication(ApplicationId appId) + throws IOException { + HistoryFileReader hfReader = getHistoryFileReader(appId); + try { + boolean readStartData = false; + boolean readFinishData = false; + ApplicationHistoryData historyData = + ApplicationHistoryData.newInstance(appId, null, null, null, null, + Long.MIN_VALUE, Long.MIN_VALUE, Long.MAX_VALUE, null, + FinalApplicationStatus.UNDEFINED, null); + while ((!readStartData || !readFinishData) && hfReader.hasNext()) { + HistoryFileReader.Entry entry = hfReader.next(); + if (entry.key.id.equals(appId.toString())) { + if (entry.key.suffix.equals(START_DATA_SUFFIX)) { + ApplicationStartData startData = + parseApplicationStartData(entry.value); + mergeApplicationHistoryData(historyData, startData); + readStartData = true; + } else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) { + ApplicationFinishData finishData = + parseApplicationFinishData(entry.value); + mergeApplicationHistoryData(historyData, finishData); + readFinishData = true; + } + } + } + if (!readStartData && !readFinishData) { + return null; + } + if (!readStartData) { + LOG.warn("Start information is missing for application " + appId); + } + if (!readFinishData) { + LOG.warn("Finish information is missing for application " + appId); + } + LOG.info("Completed reading history information of application " + appId); + return historyData; + } catch (IOException e) { + LOG.error("Error when reading history file of application " + appId); + throw e; + } finally { + hfReader.close(); + } + } + + @Override + public Map<ApplicationId, ApplicationHistoryData> getAllApplications() + throws IOException { + Map<ApplicationId, ApplicationHistoryData> historyDataMap = + new HashMap<ApplicationId, ApplicationHistoryData>(); + FileStatus[] files = fs.listStatus(rootDirPath); + for (FileStatus file : files) { + ApplicationId appId = + ConverterUtils.toApplicationId(file.getPath().getName()); + try { + ApplicationHistoryData historyData = getApplication(appId); + if (historyData != null) { + historyDataMap.put(appId, historyData); + } + } catch (IOException e) { + // Eat the exception not to disturb the getting the next + // ApplicationHistoryData + LOG.error("History information of application " + appId + + " is not included into the result due to the exception", e); + } + } + return historyDataMap; + } + + @Override + public Map<ApplicationAttemptId, ApplicationAttemptHistoryData> + getApplicationAttempts(ApplicationId appId) throws IOException { + Map<ApplicationAttemptId, ApplicationAttemptHistoryData> historyDataMap = + new HashMap<ApplicationAttemptId, ApplicationAttemptHistoryData>(); + HistoryFileReader hfReader = getHistoryFileReader(appId); + try { + while (hfReader.hasNext()) { + HistoryFileReader.Entry entry = hfReader.next(); + if (entry.key.id.startsWith( + ConverterUtils.APPLICATION_ATTEMPT_PREFIX)) { + ApplicationAttemptId appAttemptId = + ConverterUtils.toApplicationAttemptId(entry.key.id); + if (appAttemptId.getApplicationId().equals(appId)) { + ApplicationAttemptHistoryData historyData = + historyDataMap.get(appAttemptId); + if (historyData == null) { + historyData = ApplicationAttemptHistoryData.newInstance( + appAttemptId, null, -1, null, null, null, + FinalApplicationStatus.UNDEFINED, null); + historyDataMap.put(appAttemptId, historyData); + } + if (entry.key.suffix.equals(START_DATA_SUFFIX)) { + mergeApplicationAttemptHistoryData(historyData, + parseApplicationAttemptStartData(entry.value)); + } else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) { + mergeApplicationAttemptHistoryData(historyData, + parseApplicationAttemptFinishData(entry.value)); + } + } + } + } + LOG.info("Completed reading history information of all application" + + " attempts of application " + appId); + } catch (IOException e) { + LOG.info("Error when reading history information of some application" + + " attempts of application " + appId); + } finally { + hfReader.close(); + } + return historyDataMap; + } + + @Override + public ApplicationAttemptHistoryData getApplicationAttempt( + ApplicationAttemptId appAttemptId) throws IOException { + HistoryFileReader hfReader = + getHistoryFileReader(appAttemptId.getApplicationId()); + try { + boolean readStartData = false; + boolean readFinishData = false; + ApplicationAttemptHistoryData historyData = + ApplicationAttemptHistoryData.newInstance(appAttemptId, null, -1, + null, null, null, FinalApplicationStatus.UNDEFINED, null); + while ((!readStartData || !readFinishData) && hfReader.hasNext()) { + HistoryFileReader.Entry entry = hfReader.next(); + if (entry.key.id.equals(appAttemptId.toString())) { + if (entry.key.suffix.equals(START_DATA_SUFFIX)) { + ApplicationAttemptStartData startData = + parseApplicationAttemptStartData(entry.value); + mergeApplicationAttemptHistoryData(historyData, startData); + readStartData = true; + } else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) { + ApplicationAttemptFinishData finishData = + parseApplicationAttemptFinishData(entry.value); + mergeApplicationAttemptHistoryData(historyData, finishData); + readFinishData = true; + } + } + } + if (!readStartData && !readFinishData) { + return null; + } + if (!readStartData) { + LOG.warn("Start information is missing for application attempt " + + appAttemptId); + } + if (!readFinishData) { + LOG.warn("Finish information is missing for application attempt " + + appAttemptId); + } + LOG.info("Completed reading history information of application attempt " + + appAttemptId); + return historyData; + } catch (IOException e) { + LOG.error("Error when reading history file of application attempt" + + appAttemptId); + throw e; + } finally { + hfReader.close(); + } + } + + @Override + public ContainerHistoryData getContainer(ContainerId containerId) + throws IOException { + HistoryFileReader hfReader = + getHistoryFileReader(containerId.getApplicationAttemptId() + .getApplicationId()); + try { + boolean readStartData = false; + boolean readFinishData = false; + ContainerHistoryData historyData = + ContainerHistoryData + .newInstance(containerId, null, null, null, Long.MIN_VALUE, + Long.MAX_VALUE, null, Integer.MAX_VALUE, null); + while ((!readStartData || !readFinishData) && hfReader.hasNext()) { + HistoryFileReader.Entry entry = hfReader.next(); + if (entry.key.id.equals(containerId.toString())) { + if (entry.key.suffix.equals(START_DATA_SUFFIX)) { + ContainerStartData startData = parseContainerStartData(entry.value); + mergeContainerHistoryData(historyData, startData); + readStartData = true; + } else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) { + ContainerFinishData finishData = + parseContainerFinishData(entry.value); + mergeContainerHistoryData(historyData, finishData); + readFinishData = true; + } + } + } + if (!readStartData && !readFinishData) { + return null; + } + if (!readStartData) { + LOG.warn("Start information is missing for container " + containerId); + } + if (!readFinishData) { + LOG.warn("Finish information is missing for container " + containerId); + } + LOG.info("Completed reading history information of container " + + containerId); + return historyData; + } catch (IOException e) { + LOG.error("Error when reading history file of container " + containerId); + throw e; + } finally { + hfReader.close(); + } + } + + @Override + public ContainerHistoryData getAMContainer(ApplicationAttemptId appAttemptId) + throws IOException { + ApplicationAttemptHistoryData attemptHistoryData = + getApplicationAttempt(appAttemptId); + if (attemptHistoryData == null + || attemptHistoryData.getMasterContainerId() == null) { + return null; + } + return getContainer(attemptHistoryData.getMasterContainerId()); + } + + @Override + public Map<ContainerId, ContainerHistoryData> getContainers( + ApplicationAttemptId appAttemptId) throws IOException { + Map<ContainerId, ContainerHistoryData> historyDataMap = + new HashMap<ContainerId, ContainerHistoryData>(); + HistoryFileReader hfReader = + getHistoryFileReader(appAttemptId.getApplicationId()); + try { + while (hfReader.hasNext()) { + HistoryFileReader.Entry entry = hfReader.next(); + if (entry.key.id.startsWith(ConverterUtils.CONTAINER_PREFIX)) { + ContainerId containerId = + ConverterUtils.toContainerId(entry.key.id); + if (containerId.getApplicationAttemptId().equals(appAttemptId)) { + ContainerHistoryData historyData = + historyDataMap.get(containerId); + if (historyData == null) { + historyData = ContainerHistoryData.newInstance( + containerId, null, null, null, Long.MIN_VALUE, + Long.MAX_VALUE, null, Integer.MAX_VALUE, null); + historyDataMap.put(containerId, historyData); + } + if (entry.key.suffix.equals(START_DATA_SUFFIX)) { + mergeContainerHistoryData(historyData, + parseContainerStartData(entry.value)); + } else if (entry.key.suffix.equals(FINISH_DATA_SUFFIX)) { + mergeContainerHistoryData(historyData, + parseContainerFinishData(entry.value)); + } + } + } + } + LOG.info("Completed reading history information of all conatiners" + + " of application attempt " + appAttemptId); + } catch (IOException e) { + LOG.info("Error when reading history information of some containers" + + " of application attempt " + appAttemptId); + } finally { + hfReader.close(); + } + return historyDataMap; + } + + @Override + public void applicationStarted(ApplicationStartData appStart) + throws IOException { + HistoryFileWriter hfWriter = + outstandingWriters.get(appStart.getApplicationId()); + if (hfWriter == null) { + Path applicationHistoryFile = + new Path(rootDirPath, appStart.getApplicationId().toString()); + try { + hfWriter = new HistoryFileWriter(applicationHistoryFile); + LOG.info("Opened history file of application " + + appStart.getApplicationId()); + } catch (IOException e) { + LOG.error("Error when openning history file of application " + + appStart.getApplicationId()); + throw e; + } + outstandingWriters.put(appStart.getApplicationId(), hfWriter); + } else { + throw new IOException("History file of application " + + appStart.getApplicationId() + " is already opened"); + } + assert appStart instanceof ApplicationStartDataPBImpl; + try { + hfWriter.writeHistoryData(new HistoryDataKey(appStart.getApplicationId() + .toString(), START_DATA_SUFFIX), + ((ApplicationStartDataPBImpl) appStart).getProto().toByteArray()); + LOG.info("Start information of application " + + appStart.getApplicationId() + " is written"); + } catch (IOException e) { + LOG.error("Error when writing start information of application " + + appStart.getApplicationId()); + throw e; + } + } + + @Override + public void applicationFinished(ApplicationFinishData appFinish) + throws IOException { + HistoryFileWriter hfWriter = + getHistoryFileWriter(appFinish.getApplicationId()); + assert appFinish instanceof ApplicationFinishDataPBImpl; + try { + hfWriter.writeHistoryData(new HistoryDataKey(appFinish.getApplicationId() + .toString(), FINISH_DATA_SUFFIX), + ((ApplicationFinishDataPBImpl) appFinish).getProto().toByteArray()); + LOG.info("Finish information of application " + + appFinish.getApplicationId() + " is written"); + } catch (IOException e) { + LOG.error("Error when writing finish information of application " + + appFinish.getApplicationId()); + throw e; + } finally { + hfWriter.close(); + outstandingWriters.remove(appFinish.getApplicationId()); + } + } + + @Override + public void applicationAttemptStarted( + ApplicationAttemptStartData appAttemptStart) throws IOException { + HistoryFileWriter hfWriter = + getHistoryFileWriter(appAttemptStart.getApplicationAttemptId() + .getApplicationId()); + assert appAttemptStart instanceof ApplicationAttemptStartDataPBImpl; + try { + hfWriter.writeHistoryData(new HistoryDataKey(appAttemptStart + .getApplicationAttemptId().toString(), START_DATA_SUFFIX), + ((ApplicationAttemptStartDataPBImpl) appAttemptStart).getProto() + .toByteArray()); + LOG.info("Start information of application attempt " + + appAttemptStart.getApplicationAttemptId() + " is written"); + } catch (IOException e) { + LOG.error("Error when writing start information of application attempt " + + appAttemptStart.getApplicationAttemptId()); + throw e; + } + } + + @Override + public void applicationAttemptFinished( + ApplicationAttemptFinishData appAttemptFinish) throws IOException { + HistoryFileWriter hfWriter = + getHistoryFileWriter(appAttemptFinish.getApplicationAttemptId() + .getApplicationId()); + assert appAttemptFinish instanceof ApplicationAttemptFinishDataPBImpl; + try { + hfWriter.writeHistoryData(new HistoryDataKey(appAttemptFinish + .getApplicationAttemptId().toString(), FINISH_DATA_SUFFIX), + ((ApplicationAttemptFinishDataPBImpl) appAttemptFinish).getProto() + .toByteArray()); + LOG.info("Finish information of application attempt " + + appAttemptFinish.getApplicationAttemptId() + " is written"); + } catch (IOException e) { + LOG.error("Error when writing finish information of application attempt " + + appAttemptFinish.getApplicationAttemptId()); + throw e; + } + } + + @Override + public void containerStarted(ContainerStartData containerStart) + throws IOException { + HistoryFileWriter hfWriter = + getHistoryFileWriter(containerStart.getContainerId() + .getApplicationAttemptId().getApplicationId()); + assert containerStart instanceof ContainerStartDataPBImpl; + try { + hfWriter.writeHistoryData(new HistoryDataKey(containerStart + .getContainerId().toString(), START_DATA_SUFFIX), + ((ContainerStartDataPBImpl) containerStart).getProto().toByteArray()); + LOG.info("Start information of container " + + containerStart.getContainerId() + " is written"); + } catch (IOException e) { + LOG.error("Error when writing start information of container " + + containerStart.getContainerId()); + throw e; + } + } + + @Override + public void containerFinished(ContainerFinishData containerFinish) + throws IOException { + HistoryFileWriter hfWriter = + getHistoryFileWriter(containerFinish.getContainerId() + .getApplicationAttemptId().getApplicationId()); + assert containerFinish instanceof ContainerFinishDataPBImpl; + try { + hfWriter.writeHistoryData(new HistoryDataKey(containerFinish + .getContainerId().toString(), FINISH_DATA_SUFFIX), + ((ContainerFinishDataPBImpl) containerFinish).getProto().toByteArray()); + LOG.info("Finish information of container " + + containerFinish.getContainerId() + " is written"); + } catch (IOException e) { + LOG.error("Error when writing finish information of container " + + containerFinish.getContainerId()); + } + } + + private static ApplicationStartData parseApplicationStartData(byte[] value) + throws InvalidProtocolBufferException { + return new ApplicationStartDataPBImpl( + ApplicationStartDataProto.parseFrom(value)); + } + + private static ApplicationFinishData parseApplicationFinishData(byte[] value) + throws InvalidProtocolBufferException { + return new ApplicationFinishDataPBImpl( + ApplicationFinishDataProto.parseFrom(value)); + } + + private static ApplicationAttemptStartData parseApplicationAttemptStartData( + byte[] value) throws InvalidProtocolBufferException { + return new ApplicationAttemptStartDataPBImpl( + ApplicationAttemptStartDataProto.parseFrom(value)); + } + + private static ApplicationAttemptFinishData + parseApplicationAttemptFinishData(byte[] value) + throws InvalidProtocolBufferException { + return new ApplicationAttemptFinishDataPBImpl( + ApplicationAttemptFinishDataProto.parseFrom(value)); + } + + private static ContainerStartData parseContainerStartData(byte[] value) + throws InvalidProtocolBufferException { + return new ContainerStartDataPBImpl( + ContainerStartDataProto.parseFrom(value)); + } + + private static ContainerFinishData parseContainerFinishData(byte[] value) + throws InvalidProtocolBufferException { + return new ContainerFinishDataPBImpl( + ContainerFinishDataProto.parseFrom(value)); + } + + private static void mergeApplicationHistoryData( + ApplicationHistoryData historyData, ApplicationStartData startData) { + historyData.setApplicationName(startData.getApplicationName()); + historyData.setApplicationType(startData.getApplicationType()); + historyData.setQueue(startData.getQueue()); + historyData.setUser(startData.getUser()); + historyData.setSubmitTime(startData.getSubmitTime()); + historyData.setStartTime(startData.getStartTime()); + } + + private static void mergeApplicationHistoryData( + ApplicationHistoryData historyData, ApplicationFinishData finishData) { + historyData.setFinishTime(finishData.getFinishTime()); + historyData.setDiagnosticsInfo(finishData.getDiagnosticsInfo()); + historyData.setFinalApplicationStatus(finishData + .getFinalApplicationStatus()); + historyData.setYarnApplicationState(finishData.getYarnApplicationState()); + } + + private static void mergeApplicationAttemptHistoryData( + ApplicationAttemptHistoryData historyData, + ApplicationAttemptStartData startData) { + historyData.setHost(startData.getHost()); + historyData.setRPCPort(startData.getRPCPort()); + historyData.setMasterContainerId(startData.getMasterContainerId()); + } + + private static void mergeApplicationAttemptHistoryData( + ApplicationAttemptHistoryData historyData, + ApplicationAttemptFinishData finishData) { + historyData.setDiagnosticsInfo(finishData.getDiagnosticsInfo()); + historyData.setTrackingURL(finishData.getTrackingURL()); + historyData.setFinalApplicationStatus(finishData + .getFinalApplicationStatus()); + historyData.setYarnApplicationAttemptState(finishData + .getYarnApplicationAttemptState()); + } + + private static void mergeContainerHistoryData( + ContainerHistoryData historyData, ContainerStartData startData) { + historyData.setAllocatedResource(startData.getAllocatedResource()); + historyData.setAssignedNode(startData.getAssignedNode()); + historyData.setPriority(startData.getPriority()); + historyData.setStartTime(startData.getStartTime()); + } + + private static void mergeContainerHistoryData( + ContainerHistoryData historyData, ContainerFinishData finishData) { + historyData.setFinishTime(finishData.getFinishTime()); + historyData.setDiagnosticsInfo(finishData.getDiagnosticsInfo()); + historyData.setContainerExitStatus(finishData.getContainerExitStatus()); + historyData.setContainerState(finishData.getContainerState()); + } + + private HistoryFileWriter getHistoryFileWriter(ApplicationId appId) + throws IOException { + HistoryFileWriter hfWriter = outstandingWriters.get(appId); + if (hfWriter == null) { + throw new IOException("History file of application " + appId + + " is not opened"); + } + return hfWriter; + } + + private HistoryFileReader getHistoryFileReader(ApplicationId appId) + throws IOException { + Path applicationHistoryFile = new Path(rootDirPath, appId.toString()); + if (!fs.exists(applicationHistoryFile)) { + throw new IOException("History file for application " + appId + + " is not found"); + } + // The history file is still under writing + if (outstandingWriters.containsKey(appId)) { + throw new IOException("History file for application " + appId + + " is under writing"); + } + return new HistoryFileReader(applicationHistoryFile); + } + + private class HistoryFileReader { + + private class Entry { + + private HistoryDataKey key; + private byte[] value; + + public Entry(HistoryDataKey key, byte[] value) { + this.key = key; + this.value = value; + } + } + + private TFile.Reader reader; + private TFile.Reader.Scanner scanner; + + public HistoryFileReader(Path historyFile) throws IOException { + FSDataInputStream fsdis = fs.open(historyFile); + reader = + new TFile.Reader(fsdis, fs.getFileStatus(historyFile).getLen(), + getConfig()); + reset(); + } + + public boolean hasNext() { + return !scanner.atEnd(); + } + + public Entry next() throws IOException { + TFile.Reader.Scanner.Entry entry = scanner.entry(); + DataInputStream dis = entry.getKeyStream(); + HistoryDataKey key = new HistoryDataKey(); + key.readFields(dis); + dis = entry.getValueStream(); + byte[] value = new byte[entry.getValueLength()]; + dis.read(value); + scanner.advance(); + return new Entry(key, value); + } + + public void reset() throws IOException { + IOUtils.cleanup(LOG, scanner); + scanner = reader.createScanner(); + } + + public void close() { + IOUtils.cleanup(LOG, scanner, reader); + } + + } + + private class HistoryFileWriter { + + private FSDataOutputStream fsdos; + private TFile.Writer writer; + + public HistoryFileWriter(Path historyFile) throws IOException { + if (fs.exists(historyFile)) { + fsdos = fs.append(historyFile); + } else { + fsdos = fs.create(historyFile); + } + fs.setPermission(historyFile, HISTORY_FILE_UMASK); + writer = + new TFile.Writer(fsdos, MIN_BLOCK_SIZE, getConfig().get( + YarnConfiguration.FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE, + YarnConfiguration.DEFAULT_FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE), null, + getConfig()); + } + + public synchronized void close() { + IOUtils.cleanup(LOG, writer, fsdos); + } + + public synchronized void writeHistoryData(HistoryDataKey key, byte[] value) + throws IOException { + DataOutputStream dos = null; + try { + dos = writer.prepareAppendKey(-1); + key.write(dos); + } finally { + IOUtils.cleanup(LOG, dos); + } + try { + dos = writer.prepareAppendValue(value.length); + dos.write(value); + } finally { + IOUtils.cleanup(LOG, dos); + } + } + + } + + private static class HistoryDataKey implements Writable { + + private String id; + + private String suffix; + + public HistoryDataKey() { + this(null, null); + } + + public HistoryDataKey(String id, String suffix) { + this.id = id; + this.suffix = suffix; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(id); + out.writeUTF(suffix); + } + + @Override + public void readFields(DataInput in) throws IOException { + id = in.readUTF(); + suffix = in.readUTF(); + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/MemoryApplicationHistoryStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/MemoryApplicationHistoryStore.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/MemoryApplicationHistoryStore.java new file mode 100644 index 0000000..c226ad3 --- /dev/null +++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/MemoryApplicationHistoryStore.java @@ -0,0 +1,274 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptFinishData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptStartData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationFinishData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationStartData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerFinishData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData; + +/** + * In-memory implementation of {@link ApplicationHistoryStore}. This + * implementation is for test purpose only. If users improperly instantiate it, + * they may encounter reading and writing history data in different memory + * store. + * + */ +@Private +@Unstable +public class MemoryApplicationHistoryStore extends AbstractService implements + ApplicationHistoryStore { + + private final ConcurrentMap<ApplicationId, ApplicationHistoryData> applicationData = + new ConcurrentHashMap<ApplicationId, ApplicationHistoryData>(); + private final ConcurrentMap<ApplicationId, ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData>> applicationAttemptData = + new ConcurrentHashMap<ApplicationId, ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData>>(); + private final ConcurrentMap<ApplicationAttemptId, ConcurrentMap<ContainerId, ContainerHistoryData>> containerData = + new ConcurrentHashMap<ApplicationAttemptId, ConcurrentMap<ContainerId, ContainerHistoryData>>(); + + public MemoryApplicationHistoryStore() { + super(MemoryApplicationHistoryStore.class.getName()); + } + + @Override + public Map<ApplicationId, ApplicationHistoryData> getAllApplications() { + return new HashMap<ApplicationId, ApplicationHistoryData>(applicationData); + } + + @Override + public ApplicationHistoryData getApplication(ApplicationId appId) { + return applicationData.get(appId); + } + + @Override + public Map<ApplicationAttemptId, ApplicationAttemptHistoryData> + getApplicationAttempts(ApplicationId appId) { + ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData> subMap = + applicationAttemptData.get(appId); + if (subMap == null) { + return Collections + .<ApplicationAttemptId, ApplicationAttemptHistoryData> emptyMap(); + } else { + return new HashMap<ApplicationAttemptId, ApplicationAttemptHistoryData>( + subMap); + } + } + + @Override + public ApplicationAttemptHistoryData getApplicationAttempt( + ApplicationAttemptId appAttemptId) { + ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData> subMap = + applicationAttemptData.get(appAttemptId.getApplicationId()); + if (subMap == null) { + return null; + } else { + return subMap.get(appAttemptId); + } + } + + @Override + public ContainerHistoryData getAMContainer(ApplicationAttemptId appAttemptId) { + ApplicationAttemptHistoryData appAttempt = + getApplicationAttempt(appAttemptId); + if (appAttempt == null || appAttempt.getMasterContainerId() == null) { + return null; + } else { + return getContainer(appAttempt.getMasterContainerId()); + } + } + + @Override + public ContainerHistoryData getContainer(ContainerId containerId) { + Map<ContainerId, ContainerHistoryData> subMap = + containerData.get(containerId.getApplicationAttemptId()); + if (subMap == null) { + return null; + } else { + return subMap.get(containerId); + } + } + + @Override + public Map<ContainerId, ContainerHistoryData> getContainers( + ApplicationAttemptId appAttemptId) throws IOException { + ConcurrentMap<ContainerId, ContainerHistoryData> subMap = + containerData.get(appAttemptId); + if (subMap == null) { + return Collections.<ContainerId, ContainerHistoryData> emptyMap(); + } else { + return new HashMap<ContainerId, ContainerHistoryData>(subMap); + } + } + + @Override + public void applicationStarted(ApplicationStartData appStart) + throws IOException { + ApplicationHistoryData oldData = + applicationData.putIfAbsent(appStart.getApplicationId(), + ApplicationHistoryData.newInstance(appStart.getApplicationId(), + appStart.getApplicationName(), appStart.getApplicationType(), + appStart.getQueue(), appStart.getUser(), appStart.getSubmitTime(), + appStart.getStartTime(), Long.MAX_VALUE, null, null, null)); + if (oldData != null) { + throw new IOException("The start information of application " + + appStart.getApplicationId() + " is already stored."); + } + } + + @Override + public void applicationFinished(ApplicationFinishData appFinish) + throws IOException { + ApplicationHistoryData data = + applicationData.get(appFinish.getApplicationId()); + if (data == null) { + throw new IOException("The finish information of application " + + appFinish.getApplicationId() + " is stored before the start" + + " information."); + } + // Make the assumption that YarnApplicationState should not be null if + // the finish information is already recorded + if (data.getYarnApplicationState() != null) { + throw new IOException("The finish information of application " + + appFinish.getApplicationId() + " is already stored."); + } + data.setFinishTime(appFinish.getFinishTime()); + data.setDiagnosticsInfo(appFinish.getDiagnosticsInfo()); + data.setFinalApplicationStatus(appFinish.getFinalApplicationStatus()); + data.setYarnApplicationState(appFinish.getYarnApplicationState()); + } + + @Override + public void applicationAttemptStarted( + ApplicationAttemptStartData appAttemptStart) throws IOException { + ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData> subMap = + getSubMap(appAttemptStart.getApplicationAttemptId().getApplicationId()); + ApplicationAttemptHistoryData oldData = + subMap.putIfAbsent(appAttemptStart.getApplicationAttemptId(), + ApplicationAttemptHistoryData.newInstance( + appAttemptStart.getApplicationAttemptId(), + appAttemptStart.getHost(), appAttemptStart.getRPCPort(), + appAttemptStart.getMasterContainerId(), null, null, null, null)); + if (oldData != null) { + throw new IOException("The start information of application attempt " + + appAttemptStart.getApplicationAttemptId() + " is already stored."); + } + } + + @Override + public void applicationAttemptFinished( + ApplicationAttemptFinishData appAttemptFinish) throws IOException { + ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData> subMap = + getSubMap(appAttemptFinish.getApplicationAttemptId().getApplicationId()); + ApplicationAttemptHistoryData data = + subMap.get(appAttemptFinish.getApplicationAttemptId()); + if (data == null) { + throw new IOException("The finish information of application attempt " + + appAttemptFinish.getApplicationAttemptId() + " is stored before" + + " the start information."); + } + // Make the assumption that YarnApplicationAttemptState should not be null + // if the finish information is already recorded + if (data.getYarnApplicationAttemptState() != null) { + throw new IOException("The finish information of application attempt " + + appAttemptFinish.getApplicationAttemptId() + " is already stored."); + } + data.setTrackingURL(appAttemptFinish.getTrackingURL()); + data.setDiagnosticsInfo(appAttemptFinish.getDiagnosticsInfo()); + data + .setFinalApplicationStatus(appAttemptFinish.getFinalApplicationStatus()); + data.setYarnApplicationAttemptState(appAttemptFinish + .getYarnApplicationAttemptState()); + } + + private ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData> + getSubMap(ApplicationId appId) { + applicationAttemptData + .putIfAbsent( + appId, + new ConcurrentHashMap<ApplicationAttemptId, ApplicationAttemptHistoryData>()); + return applicationAttemptData.get(appId); + } + + @Override + public void containerStarted(ContainerStartData containerStart) + throws IOException { + ConcurrentMap<ContainerId, ContainerHistoryData> subMap = + getSubMap(containerStart.getContainerId().getApplicationAttemptId()); + ContainerHistoryData oldData = + subMap.putIfAbsent(containerStart.getContainerId(), + ContainerHistoryData.newInstance(containerStart.getContainerId(), + containerStart.getAllocatedResource(), + containerStart.getAssignedNode(), containerStart.getPriority(), + containerStart.getStartTime(), Long.MAX_VALUE, null, + Integer.MAX_VALUE, null)); + if (oldData != null) { + throw new IOException("The start information of container " + + containerStart.getContainerId() + " is already stored."); + } + } + + @Override + public void containerFinished(ContainerFinishData containerFinish) + throws IOException { + ConcurrentMap<ContainerId, ContainerHistoryData> subMap = + getSubMap(containerFinish.getContainerId().getApplicationAttemptId()); + ContainerHistoryData data = subMap.get(containerFinish.getContainerId()); + if (data == null) { + throw new IOException("The finish information of container " + + containerFinish.getContainerId() + " is stored before" + + " the start information."); + } + // Make the assumption that ContainerState should not be null if + // the finish information is already recorded + if (data.getContainerState() != null) { + throw new IOException("The finish information of container " + + containerFinish.getContainerId() + " is already stored."); + } + data.setFinishTime(containerFinish.getFinishTime()); + data.setDiagnosticsInfo(containerFinish.getDiagnosticsInfo()); + data.setContainerExitStatus(containerFinish.getContainerExitStatus()); + data.setContainerState(containerFinish.getContainerState()); + } + + private ConcurrentMap<ContainerId, ContainerHistoryData> getSubMap( + ApplicationAttemptId appAttemptId) { + containerData.putIfAbsent(appAttemptId, + new ConcurrentHashMap<ContainerId, ContainerHistoryData>()); + return containerData.get(appAttemptId); + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/NullApplicationHistoryStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/NullApplicationHistoryStore.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/NullApplicationHistoryStore.java new file mode 100644 index 0000000..3660c10 --- /dev/null +++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/NullApplicationHistoryStore.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptFinishData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptStartData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationFinishData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationStartData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerFinishData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData; +import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData; + +/** + * Dummy implementation of {@link ApplicationHistoryStore}. If this + * implementation is used, no history data will be persisted. + * + */ +@Unstable +@Private +public class NullApplicationHistoryStore extends AbstractService implements + ApplicationHistoryStore { + + public NullApplicationHistoryStore() { + super(NullApplicationHistoryStore.class.getName()); + } + + @Override + public void applicationStarted(ApplicationStartData appStart) + throws IOException { + } + + @Override + public void applicationFinished(ApplicationFinishData appFinish) + throws IOException { + } + + @Override + public void applicationAttemptStarted( + ApplicationAttemptStartData appAttemptStart) throws IOException { + } + + @Override + public void applicationAttemptFinished( + ApplicationAttemptFinishData appAttemptFinish) throws IOException { + } + + @Override + public void containerStarted(ContainerStartData containerStart) + throws IOException { + } + + @Override + public void containerFinished(ContainerFinishData containerFinish) + throws IOException { + } + + @Override + public ApplicationHistoryData getApplication(ApplicationId appId) + throws IOException { + return null; + } + + @Override + public Map<ApplicationId, ApplicationHistoryData> getAllApplications() + throws IOException { + return Collections.emptyMap(); + } + + @Override + public Map<ApplicationAttemptId, ApplicationAttemptHistoryData> + getApplicationAttempts(ApplicationId appId) throws IOException { + return Collections.emptyMap(); + } + + @Override + public ApplicationAttemptHistoryData getApplicationAttempt( + ApplicationAttemptId appAttemptId) throws IOException { + return null; + } + + @Override + public ContainerHistoryData getContainer(ContainerId containerId) + throws IOException { + return null; + } + + @Override + public ContainerHistoryData getAMContainer(ApplicationAttemptId appAttemptId) + throws IOException { + return null; + } + + @Override + public Map<ContainerId, ContainerHistoryData> getContainers( + ApplicationAttemptId appAttemptId) throws IOException { + return Collections.emptyMap(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/865d187e/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java new file mode 100644 index 0000000..e702fc0 --- /dev/null +++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java @@ -0,0 +1,294 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.codehaus.jackson.annotate.JsonCreator; +import org.codehaus.jackson.annotate.JsonProperty; +import org.codehaus.jackson.annotate.JsonSubTypes; +import org.codehaus.jackson.map.ObjectMapper; + +import java.io.File; +import java.io.IOException; +import java.util.Date; + +public abstract class AbstractTimelineAggregator implements Runnable { + protected final PhoenixHBaseAccessor hBaseAccessor; + protected final String CHECKPOINT_LOCATION; + private final Log LOG; + static final long checkpointDelay = 120000; + static final Integer RESULTSET_FETCH_SIZE = 5000; + private static final ObjectMapper mapper; + + static { + //SimpleModule simpleModule = new SimpleModule("MetricAggregator", new Version(1, 0, 0, null)); + //simpleModule.addSerializer(new MetricAggregateSerializer()); + mapper = new ObjectMapper(); + //mapper.registerModule(simpleModule); + } + + public AbstractTimelineAggregator(PhoenixHBaseAccessor hBaseAccessor, + String checkpointLocation) { + this.hBaseAccessor = hBaseAccessor; + this.CHECKPOINT_LOCATION = checkpointLocation; + this.LOG = LogFactory.getLog(this.getClass()); + } + + @Override + public void run() { + LOG.info("Started Timeline aggregator thread @ " + new Date()); + Long SLEEP_INTERVAL = getSleepInterval(); + + while (true) { + long currentTime = System.currentTimeMillis(); + long lastCheckPointTime = -1; + + try { + lastCheckPointTime = readCheckPoint(); + if (isLastCheckPointTooOld(lastCheckPointTime)) { + LOG.warn("Last Checkpoint is too old, discarding last checkpoint. " + + "lastCheckPointTime = " + lastCheckPointTime); + lastCheckPointTime = -1; + } + if (lastCheckPointTime == -1) { + // Assuming first run, save checkpoint and sleep. + // Set checkpoint to 2 minutes in the past to allow the + // agents/collectors to catch up + saveCheckPoint(currentTime - checkpointDelay); + } + } catch (IOException io) { + LOG.warn("Unable to write last checkpoint time. Resuming sleep.", io); + } + + if (lastCheckPointTime != -1) { + LOG.info("Last check point time: " + lastCheckPointTime + ", " + + "lagBy: " + ((System.currentTimeMillis() - lastCheckPointTime)) / 1000); + boolean success = doWork(lastCheckPointTime, lastCheckPointTime + SLEEP_INTERVAL); + if (success) { + try { + saveCheckPoint(lastCheckPointTime + SLEEP_INTERVAL); + } catch (IOException io) { + LOG.warn("Error saving checkpoint, restarting aggregation at " + + "previous checkpoint."); + } + } + } + + try { + Thread.sleep(SLEEP_INTERVAL); + } catch (InterruptedException e) { + LOG.info("Sleep interrupted, continuing with aggregation."); + } + } + } + + private boolean isLastCheckPointTooOld(long checkpoint) { + return checkpoint != -1 && + ((System.currentTimeMillis() - checkpoint) > getCheckpointCutOffInterval()); + } + + private long readCheckPoint() { + try { + File checkpoint = new File(CHECKPOINT_LOCATION); + if (checkpoint.exists()) { + String contents = FileUtils.readFileToString(checkpoint); + if (contents != null && !contents.isEmpty()) { + return Long.parseLong(contents); + } + } + } catch (IOException io) { + LOG.debug(io); + } + return -1; + } + + private void saveCheckPoint(long checkpointTime) throws IOException { + File checkpoint = new File(CHECKPOINT_LOCATION); + if (!checkpoint.exists()) { + boolean done = checkpoint.createNewFile(); + if (!done) { + throw new IOException("Could not create checkpoint at location, " + + CHECKPOINT_LOCATION); + } + } + FileUtils.writeStringToFile(checkpoint, String.valueOf(checkpointTime)); + } + + // TODO: Abstract out doWork implementation for cluster and host levels + protected abstract boolean doWork(long startTime, long endTime); + + protected abstract Long getSleepInterval(); + + protected abstract Long getCheckpointCutOffInterval(); + + @JsonSubTypes({ @JsonSubTypes.Type(value = MetricClusterAggregate.class), + @JsonSubTypes.Type(value = MetricHostAggregate.class) }) + @InterfaceAudience.Public + @InterfaceStability.Unstable + public static class MetricAggregate { + protected Double sum = 0.0; + protected Double deviation; + protected Double max = Double.MIN_VALUE; + protected Double min = Double.MAX_VALUE; + + public MetricAggregate() {} + + protected MetricAggregate(Double sum, Double deviation, Double max, Double min) { + this.sum = sum; + this.deviation = deviation; + this.max = max; + this.min = min; + } + + void updateSum(Double sum) { + this.sum += sum; + } + + void updateMax(Double max) { + if (max > this.max) { + this.max = max; + } + } + + void updateMin(Double min) { + if (min < this.min) { + this.min = min; + } + } + + @JsonProperty("sum") + Double getSum() { + return sum; + } + + @JsonProperty("deviation") + Double getDeviation() { + return deviation; + } + + @JsonProperty("max") + Double getMax() { + return max; + } + + @JsonProperty("min") + Double getMin() { + return min; + } + + public void setSum(Double sum) { + this.sum = sum; + } + + public void setDeviation(Double deviation) { + this.deviation = deviation; + } + + public void setMax(Double max) { + this.max = max; + } + + public void setMin(Double min) { + this.min = min; + } + + public String toJSON() throws IOException { + return mapper.writeValueAsString(this); + } + } + + public static class MetricClusterAggregate extends MetricAggregate { + private int numberOfHosts; + + @JsonCreator + public MetricClusterAggregate() {} + + MetricClusterAggregate(Double sum, int numberOfHosts, Double deviation, + Double max, Double min) { + super(sum, deviation, max, min); + this.numberOfHosts = numberOfHosts; + } + + @JsonProperty("numberOfHosts") + int getNumberOfHosts() { + return numberOfHosts; + } + + void updateNumberOfHosts(int count) { + this.numberOfHosts += count; + } + + public void setNumberOfHosts(int numberOfHosts) { + this.numberOfHosts = numberOfHosts; + } + + @Override + public String toString() { + return "MetricAggregate{" + + "sum=" + sum + + ", numberOfHosts=" + numberOfHosts + + ", deviation=" + deviation + + ", max=" + max + + ", min=" + min + + '}'; + } + } + + /** + * Represents a collection of minute based aggregation of values for + * resolution greater than a minute. + */ + public static class MetricHostAggregate extends MetricAggregate { + + @JsonCreator + public MetricHostAggregate() { + super(0.0, 0.0, Double.MIN_VALUE, Double.MAX_VALUE); + } + + /** + * Find and update min, max and avg for a minute + */ + void updateAggregates(MetricHostAggregate hostAggregate) { + updateMax(hostAggregate.getMax()); + updateMin(hostAggregate.getMin()); + updateSum(hostAggregate.getSum()); + } + + /** + * Reuse sum to indicate average for a host for the hour + */ + @Override + void updateSum(Double sum) { + this.sum = (this.sum + sum) / 2; + } + + @Override + public String toString() { + return "MetricHostAggregate{" + + "sum=" + sum + + ", deviation=" + deviation + + ", max=" + max + + ", min=" + min + + '}'; + } + } +}