AMBARI-14806. Provide Metrics discovery API for AMS. (swagle)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/646fb429 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/646fb429 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/646fb429 Branch: refs/heads/trunk Commit: 646fb429f53adaa687dd9d4e7acdff617203caf6 Parents: 73b5399 Author: Siddharth Wagle <swa...@hortonworks.com> Authored: Wed Jan 27 15:51:26 2016 -0800 Committer: Siddharth Wagle <swa...@hortonworks.com> Committed: Wed Jan 27 15:51:26 2016 -0800 ---------------------------------------------------------------------- .../sink/timeline/MetadataException.java | 28 ++ .../sink/timeline/TimelineMetricMetadata.java | 163 ++++++++++++ .../timeline/HBaseTimelineMetricStore.java | 73 ++++-- .../metrics/timeline/PhoenixHBaseAccessor.java | 253 ++++++++++++++++++- .../timeline/TimelineMetricConfiguration.java | 9 + .../metrics/timeline/TimelineMetricStore.java | 22 +- .../TimelineMetricAggregatorFactory.java | 5 +- .../TimelineMetricAppAggregator.java | 28 +- .../TimelineMetricClusterAggregatorSecond.java | 5 +- .../discovery/TimelineMetricMetadataKey.java | 56 ++++ .../TimelineMetricMetadataManager.java | 187 ++++++++++++++ .../discovery/TimelineMetricMetadataSync.java | 105 ++++++++ .../timeline/query/PhoenixTransactSQL.java | 37 ++- .../webapp/TimelineWebServices.java | 59 ++++- .../TestApplicationHistoryServer.java | 11 +- .../timeline/AbstractMiniHBaseClusterTest.java | 8 +- .../timeline/ITPhoenixHBaseAccessor.java | 9 +- .../timeline/TestTimelineMetricStore.java | 14 + .../aggregators/ITClusterAggregator.java | 16 +- .../timeline/discovery/TestMetadataManager.java | 112 ++++++++ 20 files changed, 1123 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/646fb429/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetadataException.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetadataException.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetadataException.java new file mode 100644 index 0000000..01230af --- /dev/null +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetadataException.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.sink.timeline; + +/** + * Marker for checked Exceptions thrown from Metadata management layer. + */ +public class MetadataException extends Exception { + // Default constructor + public MetadataException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/646fb429/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricMetadata.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricMetadata.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricMetadata.java new file mode 100644 index 0000000..0624f9c --- /dev/null +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricMetadata.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.sink.timeline; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.codehaus.jackson.annotate.JsonIgnore; +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; + +@XmlRootElement(name = "metric_metadata") +@XmlAccessorType(XmlAccessType.NONE) +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class TimelineMetricMetadata { + private String metricName; + private String appId; + private String units; + private MetricType type = MetricType.UNDEFINED; + private Long seriesStartTime; + boolean supportsAggregates = true; + // Serialization ignored helper flag + boolean isPersisted = false; + + public enum MetricType { + GAUGE, // Can vary in both directions + COUNTER, // Single dimension + UNDEFINED // Default + } + + // Default constructor + public TimelineMetricMetadata() { + } + + public TimelineMetricMetadata(String metricName, String appId, String units, + MetricType type, Long seriesStartTime, + boolean supportsAggregates) { + this.metricName = metricName; + this.appId = appId; + this.units = units; + this.type = type; + this.seriesStartTime = seriesStartTime; + this.supportsAggregates = supportsAggregates; + } + + @XmlElement(name = "metricname") + public String getMetricName() { + return metricName; + } + + public void setMetricName(String metricName) { + this.metricName = metricName; + } + + // This is the key for the webservice hence ignored. + //@XmlElement(name = "appid") + public String getAppId() { + return appId; + } + + public void setAppId(String appId) { + this.appId = appId; + } + + @XmlElement(name = "units") + public String getUnits() { + return units; + } + + public void setUnits(String units) { + this.units = units; + } + + @XmlElement(name = "type") + public MetricType getType() { + return type; + } + + public void setType(MetricType type) { + this.type = type; + } + + @XmlElement(name = "seriesStartTime") + public Long getSeriesStartTime() { + return seriesStartTime; + } + + public void setSeriesStartTime(Long seriesStartTime) { + this.seriesStartTime = seriesStartTime; + } + + @XmlElement(name = "supportsAggregation") + public boolean isSupportsAggregates() { + return supportsAggregates; + } + + public void setSupportsAggregates(boolean supportsAggregates) { + this.supportsAggregates = supportsAggregates; + } + + @JsonIgnore + public boolean isPersisted() { + return isPersisted; + } + + public void setIsPersisted(boolean isPersisted) { + this.isPersisted = isPersisted; + } + + /** + * Assumes the key of the object being compared is the same as @TimelineMetricMetadata + * @param metadata @TimelineMetricMetadata to be compared + */ + public boolean needsToBeSynced(TimelineMetricMetadata metadata) throws MetadataException { + if (!this.metricName.equals(metadata.getMetricName()) || + !this.appId.equals(metadata.getAppId())) { + throw new MetadataException("Unexpected argument: metricName = " + + metadata.getMetricName() + ", appId = " + metadata.getAppId()); + } + + // Series start time should never change + return (this.units != null && !this.units.equals(metadata.getUnits())) || + (this.type != null && !this.type.equals(metadata.getType())) || + //!this.lastRecordedTime.equals(metadata.getLastRecordedTime()) || // TODO: support + !this.supportsAggregates == metadata.isSupportsAggregates(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TimelineMetricMetadata that = (TimelineMetricMetadata) o; + + if (!metricName.equals(that.metricName)) return false; + return !(appId != null ? !appId.equals(that.appId) : that.appId != null); + + } + + @Override + public int hashCode() { + int result = metricName.hashCode(); + result = 31 * result + (appId != null ? appId.hashCode() : 0); + return result; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/646fb429/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java index c4e946a..c30a354 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java @@ -23,12 +23,15 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.metrics2.sink.timeline.Precision; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition; @@ -39,15 +42,10 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; -import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES; @@ -58,8 +56,8 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin private final TimelineMetricConfiguration configuration; private PhoenixHBaseAccessor hBaseAccessor; private static volatile boolean isInitialized = false; - private final ScheduledExecutorService executorService = - Executors.newSingleThreadScheduledExecutor(); + private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + private TimelineMetricMetadataManager metricMetadataManager; /** * Construct the service. @@ -81,6 +79,9 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin if (!isInitialized) { hBaseAccessor = new PhoenixHBaseAccessor(hbaseConf, metricsConf); hBaseAccessor.initMetricSchema(); + // Initialize metadata from store + metricMetadataManager = new TimelineMetricMetadataManager(hBaseAccessor, metricsConf); + metricMetadataManager.initializeMetadata(); if (Boolean.parseBoolean(metricsConf.get(USE_GROUPBY_AGGREGATOR_QUERIES, "true"))) { LOG.info("Using group by aggregators for aggregating host and cluster metrics."); @@ -88,7 +89,7 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin // Start the cluster aggregator second TimelineMetricAggregator secondClusterAggregator = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hBaseAccessor, metricsConf); + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hBaseAccessor, metricsConf, metricMetadataManager); if (!secondClusterAggregator.isDisabled()) { Thread aggregatorThread = new Thread(secondClusterAggregator); aggregatorThread.start(); @@ -188,8 +189,7 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin TimelineMetrics metrics; if (hostnames == null || hostnames.isEmpty()) { - metrics = hBaseAccessor.getAggregateMetricRecords(condition, - metricFunctions); + metrics = hBaseAccessor.getAggregateMetricRecords(condition, metricFunctions); } else { metrics = hBaseAccessor.getMetricRecords(condition, metricFunctions); } @@ -199,7 +199,7 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin private TimelineMetrics postProcessMetrics(TimelineMetrics metrics) { List<TimelineMetric> metricsList = metrics.getMetrics(); - for (TimelineMetric metric: metricsList){ + for (TimelineMetric metric : metricsList){ String name = metric.getMetricName(); if (name.contains("._rate")){ updateValueAsRate(metric.getMetricValues()); @@ -250,22 +250,17 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin // fallback to VALUE, and fullMetricName } - addFunctionToMetricName(metricsFunctions, cleanMetricName, function); + List<Function> functionsList = metricsFunctions.get(cleanMetricName); + if (functionsList == null) { + functionsList = new ArrayList<Function>(1); + } + functionsList.add(function); + metricsFunctions.put(cleanMetricName, functionsList); } return metricsFunctions; } - private static void addFunctionToMetricName( - HashMap<String, List<Function>> metricsFunctions, String cleanMetricName, - Function function) { - - List<Function> functionsList = metricsFunctions.get(cleanMetricName); - if (functionsList==null) functionsList = new ArrayList<Function>(1); - functionsList.add(function); - metricsFunctions.put(cleanMetricName, functionsList); - } - @Override public TimelineMetric getTimelineMetric(String metricName, List<String> hostnames, String applicationId, String instanceId, Long startTime, @@ -314,16 +309,38 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin return metric; } - @Override - public TimelinePutResponse putMetrics(TimelineMetrics metrics) - throws SQLException, IOException { - + public TimelinePutResponse putMetrics(TimelineMetrics metrics) throws SQLException, IOException { // Error indicated by the Sql exception TimelinePutResponse response = new TimelinePutResponse(); - hBaseAccessor.insertMetricRecords(metrics); + hBaseAccessor.insertMetricRecordsWithMetadata(metricMetadataManager, metrics); return response; } + + @Override + public Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadata() throws SQLException, IOException { + Map<TimelineMetricMetadataKey, TimelineMetricMetadata> metadata = + metricMetadataManager.getMetadataCache(); + + // Group Metadata by AppId + Map<String, List<TimelineMetricMetadata>> metadataByAppId = new HashMap<>(); + for (TimelineMetricMetadata metricMetadata : metadata.values()) { + List<TimelineMetricMetadata> metadataList = metadataByAppId.get(metricMetadata.getAppId()); + if (metadataList == null) { + metadataList = new ArrayList<>(); + metadataByAppId.put(metricMetadata.getAppId(), metadataList); + } + + metadataList.add(metricMetadata); + } + + return metadataByAppId; + } + + @Override + public Map<String, Set<String>> getHostAppsMetadata() throws SQLException, IOException { + return metricMetadataManager.getHostedAppsCache(); + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/646fb429/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java index 8325fb1..980c4af 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; +import com.google.common.base.Enums; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -27,6 +28,7 @@ import org.apache.hadoop.hbase.util.RetryCounterFactory; import org.apache.hadoop.metrics2.sink.timeline.Precision; import org.apache.hadoop.metrics2.sink.timeline.SingleValuedTimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function; @@ -34,6 +36,8 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline. import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricReadHelper; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.ConnectionProvider; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultPhoenixDataSource; @@ -51,12 +55,18 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; import java.util.concurrent.TimeUnit; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata.*; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATE_TABLE_SPLIT_POINTS; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATORS_SKIP_BLOCK_CACHE; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_DAILY_TABLE_TTL; @@ -75,12 +85,16 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_SPLIT_POINTS; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_TTL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.ALTER_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_HOSTED_APPS_METADATA_TABLE_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_TABLE_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_METADATA_TABLE_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.DEFAULT_ENCODING; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_HOSTED_APPS_METADATA_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_METADATA_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME; @@ -92,6 +106,8 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_AGGREGATE_RECORD_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_TIME_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_HOSTED_APPS_METADATA_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METADATA_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METRICS_SQL; /** @@ -260,6 +276,14 @@ public class PhoenixHBaseAccessor { conn = getConnectionRetryingOnException(); stmt = conn.createStatement(); + // Metadata + String metadataSql = String.format(CREATE_METRICS_METADATA_TABLE_SQL, + encoding, compression); + stmt.executeUpdate(metadataSql); + String hostedAppSql = String.format(CREATE_HOSTED_APPS_METADATA_TABLE_SQL, + encoding, compression); + stmt.executeUpdate(hostedAppSql); + // Host level String precisionSql = String.format(CREATE_METRICS_TABLE_SQL, encoding, precisionTtl, compression); @@ -371,8 +395,8 @@ public class PhoenixHBaseAccessor { return ""; } - public void insertMetricRecords(TimelineMetrics metrics) throws SQLException, IOException { - + public void insertMetricRecordsWithMetadata(TimelineMetricMetadataManager metadataManager, + TimelineMetrics metrics) throws SQLException, IOException { List<TimelineMetric> timelineMetrics = metrics.getMetrics(); if (timelineMetrics == null || timelineMetrics.isEmpty()) { LOG.debug("Empty metrics insert request."); @@ -422,8 +446,16 @@ public class PhoenixHBaseAccessor { try { metricRecordStmt.executeUpdate(); + + // Write to metadata cache on successful write to store + metadataManager.putIfModifiedTimelineMetricMetadata( + metadataManager.getTimelineMetricMetadata(metric)); + + metadataManager.putIfModifiedHostedAppsMetadata( + metric.getHostName(), metric.getAppId()); + } catch (SQLException sql) { - LOG.error(sql); + LOG.error("Failed on insert records to store.", sql); } } @@ -448,6 +480,10 @@ public class PhoenixHBaseAccessor { } } + public void insertMetricRecords(TimelineMetrics metrics) throws SQLException, IOException { + insertMetricRecordsWithMetadata(null, metrics); + } + @SuppressWarnings("unchecked") public TimelineMetrics getMetricRecords( final Condition condition, Map<String, List<Function>> metricFunctions) @@ -566,8 +602,7 @@ public class PhoenixHBaseAccessor { } } else { - TimelineMetric metric; - metric = TIMELINE_METRIC_READ_HELPER.getTimelineMetricFromResultSet(rs); + TimelineMetric metric = TIMELINE_METRIC_READ_HELPER.getTimelineMetricFromResultSet(rs); if (condition.isGrouped()) { metrics.addOrMergeTimelineMetric(metric); @@ -1032,4 +1067,212 @@ public class PhoenixHBaseAccessor { public boolean isSkipBlockCacheForAggregatorsEnabled() { return skipBlockCacheForAggregatorsEnabled; } + + /** + * One time save of metadata when discovering topology during aggregation. + * @throws SQLException + */ + public void saveHostAppsMetadata(Map<String, Set<String>> hostedApps) throws SQLException { + Connection conn = getConnection(); + PreparedStatement stmt = null; + try { + stmt = conn.prepareStatement(UPSERT_HOSTED_APPS_METADATA_SQL); + int rowCount = 0; + + for (Map.Entry<String, Set<String>> hostedAppsEntry : hostedApps.entrySet()) { + if (LOG.isTraceEnabled()) { + LOG.trace("HostedAppsMetadata: " + hostedAppsEntry); + } + + stmt.clearParameters(); + stmt.setString(1, hostedAppsEntry.getKey()); + stmt.setString(2, StringUtils.join(hostedAppsEntry.getValue(), ",")); + try { + stmt.executeUpdate(); + rowCount++; + } catch (SQLException sql) { + LOG.error("Error saving hosted apps metadata.", sql); + } + } + + conn.commit(); + LOG.info("Saved " + rowCount + " hosted apps metadata records."); + + } finally { + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException e) { + // Ignore + } + } + if (conn != null) { + try { + conn.close(); + } catch (SQLException sql) { + // Ignore + } + } + } + } + + /** + * Save metdata on updates. + * @param metricMetadata @Collection<@TimelineMetricMetadata> + * @throws SQLException + */ + public void saveMetricMetadata(Collection<TimelineMetricMetadata> metricMetadata) throws SQLException { + if (metricMetadata.isEmpty()) { + LOG.info("No metadata records to save."); + return; + } + + Connection conn = getConnection(); + PreparedStatement stmt = null; + + try { + stmt = conn.prepareStatement(UPSERT_METADATA_SQL); + int rowCount = 0; + + for (TimelineMetricMetadata metadata : metricMetadata) { + if (LOG.isTraceEnabled()) { + LOG.trace("TimelineMetricMetadata: metricName = " + metadata.getMetricName() + + ", appId = " + metadata.getAppId() + + ", seriesStartTime = " + metadata.getSeriesStartTime() + ); + } + + stmt.clearParameters(); + stmt.setString(1, metadata.getMetricName()); + stmt.setString(2, metadata.getAppId()); + stmt.setString(3, metadata.getUnits()); + stmt.setString(4, metadata.getType().name()); + stmt.setLong(5, metadata.getSeriesStartTime()); + stmt.setBoolean(6, metadata.isSupportsAggregates()); + + try { + stmt.executeUpdate(); + rowCount++; + } catch (SQLException sql) { + LOG.error("Error saving metadata.", sql); + } + } + + conn.commit(); + LOG.info("Saved " + rowCount + " metadata records."); + + } finally { + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException e) { + // Ignore + } + } + if (conn != null) { + try { + conn.close(); + } catch (SQLException sql) { + // Ignore + } + } + } + } + + public Map<String, Set<String>> getHostedAppsMetadata() throws SQLException { + Map<String, Set<String>> hostedAppMap = new HashMap<>(); + Connection conn = getConnection(); + PreparedStatement stmt = null; + ResultSet rs = null; + + try { + stmt = conn.prepareStatement(GET_HOSTED_APPS_METADATA_SQL); + rs = stmt.executeQuery(); + + while (rs.next()) { + hostedAppMap.put(rs.getString("HOSTNAME"), + new HashSet<>(Arrays.asList(StringUtils.split(rs.getString("APP_IDS"), ",")))); + } + + } finally { + if (rs != null) { + try { + rs.close(); + } catch (SQLException e) { + // Ignore + } + } + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException e) { + // Ignore + } + } + if (conn != null) { + try { + conn.close(); + } catch (SQLException sql) { + // Ignore + } + } + } + + return hostedAppMap; + } + + // No filter criteria support for now. + public Map<TimelineMetricMetadataKey, TimelineMetricMetadata> getTimelineMetricMetadata() throws SQLException { + Map<TimelineMetricMetadataKey, TimelineMetricMetadata> metadataMap = new HashMap<>(); + Connection conn = getConnection(); + PreparedStatement stmt = null; + ResultSet rs = null; + + try { + stmt = conn.prepareStatement(GET_METRIC_METADATA_SQL); + rs = stmt.executeQuery(); + + while (rs.next()) { + String metricName = rs.getString("METRIC_NAME"); + String appId = rs.getString("APP_ID"); + TimelineMetricMetadata metadata = new TimelineMetricMetadata( + metricName, + appId, + rs.getString("UNITS"), + Enums.getIfPresent(MetricType.class, rs.getString("TYPE")).or(MetricType.UNDEFINED), + rs.getLong("START_TIME"), + rs.getBoolean("SUPPORTS_AGGREGATION") + ); + + TimelineMetricMetadataKey key = new TimelineMetricMetadataKey(metricName, appId); + metadata.setIsPersisted(true); // Always true on retrieval + metadataMap.put(key, metadata); + } + + } finally { + if (rs != null) { + try { + rs.close(); + } catch (SQLException e) { + // Ignore + } + } + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException e) { + // Ignore + } + } + if (conn != null) { + try { + conn.close(); + } catch (SQLException sql) { + // Ignore + } + } + } + + return metadataMap; + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/646fb429/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java index ea48efe..46f61fb 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java @@ -202,6 +202,15 @@ public class TimelineMetricConfiguration { public static final String AGGREGATORS_SKIP_BLOCK_CACHE = "timeline.metrics.aggregators.skip.blockcache.enabled"; + public static final String DISABLE_METRIC_METADATA_MGMT = + "timeline.metrics.service.metadata.management.disabled"; + + public static final String METRICS_METADATA_SYNC_INIT_DELAY = + "timeline.metrics.service.metadata.sync.init.delay"; + + public static final String METRICS_METADATA_SYNC_SCHEDULE_DELAY = + "timeline.metrics.service.metadata.sync.delay"; + public static final String HOST_APP_ID = "HOST"; private Configuration hbaseConf; http://git-wip-us.apache.org/repos/asf/ambari/blob/646fb429/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java index e062ca0..0aa102e 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java @@ -19,11 +19,14 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline import org.apache.hadoop.metrics2.sink.timeline.Precision; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import java.io.IOException; import java.sql.SQLException; import java.util.List; +import java.util.Map; +import java.util.Set; public interface TimelineMetricStore { /** @@ -67,6 +70,21 @@ public interface TimelineMetricStore { * @return An {@link org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse}. * @throws SQLException, IOException */ - TimelinePutResponse putMetrics(TimelineMetrics metrics) - throws SQLException, IOException; + TimelinePutResponse putMetrics(TimelineMetrics metrics) throws SQLException, IOException; + + /** + * Return all metrics metadata that have been written to the store. + * @return { appId : [ @TimelineMetricMetadata ] } + * @throws SQLException + * @throws IOException + */ + Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadata() throws SQLException, IOException; + + /** + * Returns all hosts that have written metrics with the apps on the host + * @return { hostname : [ appIds ] } + * @throws SQLException + * @throws IOException + */ + Map<String, Set<String>> getHostAppsMetadata() throws SQLException, IOException; } http://git-wip-us.apache.org/repos/asf/ambari/blob/646fb429/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java index f0b2fda..cc85c56 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline import org.apache.commons.io.FilenameUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER; @@ -227,7 +228,8 @@ public class TimelineMetricAggregatorFactory { * Timeslice : 30 sec */ public static TimelineMetricAggregator createTimelineClusterAggregatorSecond( - PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { + PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, + TimelineMetricMetadataManager metadataManager) { String checkpointDir = metricsConf.get( TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); @@ -251,6 +253,7 @@ public class TimelineMetricAggregatorFactory { // Second based aggregation have added responsibility of time slicing return new TimelineMetricClusterAggregatorSecond( "TimelineClusterAggregatorSecond", + metadataManager, hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, http://git-wip-us.apache.org/repos/asf/ambari/blob/646fb429/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java index 0c8ded2..05beb76 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java @@ -21,12 +21,17 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; + import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; + import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_APP_IDS; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_APP_ID; @@ -40,13 +45,13 @@ public class TimelineMetricAppAggregator { private static final Log LOG = LogFactory.getLog(TimelineMetricAppAggregator.class); // Lookup to check candidacy of an app private final List<String> appIdsToAggregate; - // Map to lookup apps on a host - private Map<String, List<String>> hostedAppsMap = new HashMap<String, List<String>>(); - + private final Map<String, Set<String>> hostedAppsMap; Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics; - public TimelineMetricAppAggregator(Configuration metricsConf) { + public TimelineMetricAppAggregator(TimelineMetricMetadataManager metadataManager, + Configuration metricsConf) { appIdsToAggregate = getAppIdsForHostAggregation(metricsConf); + hostedAppsMap = metadataManager.getHostedAppsCache(); LOG.info("AppIds configured for aggregation: " + appIdsToAggregate); } @@ -67,15 +72,6 @@ public class TimelineMetricAppAggregator { } /** - * Useful for resetting apps that no-longer need aggregation without restart. - */ - public void destroy() { - LOG.debug("Cleanup aggregated data as well as in-memory state."); - aggregateClusterMetrics = null; - hostedAppsMap = new HashMap<String, List<String>>(); - } - - /** * Calculate aggregates if the clusterMetric is a Host metric for recorded * apps that are housed by this host. * @@ -101,9 +97,9 @@ public class TimelineMetricAppAggregator { // Build the hostedapps map if not a host metric // Check app candidacy for host aggregation if (appIdsToAggregate.contains(appId)) { - List<String> appIds = hostedAppsMap.get(hostname); + Set<String> appIds = hostedAppsMap.get(hostname); if (appIds == null) { - appIds = new ArrayList<String>(); + appIds = new HashSet<>(); hostedAppsMap.put(hostname, appIds); } if (!appIds.contains(appId)) { @@ -126,7 +122,7 @@ public class TimelineMetricAppAggregator { return; } - List<String> apps = hostedAppsMap.get(hostname); + Set<String> apps = hostedAppsMap.get(hostname); for (String appId : apps) { // Add a new cluster aggregate metric if none exists TimelineClusterMetric appTimelineClusterMetric = http://git-wip-us.apache.org/repos/asf/ambari/blob/646fb429/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java index b26d3f0..ec141e7 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java @@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; @@ -50,7 +51,9 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre // 1 minute client side buffering adjustment private final Long serverTimeShiftAdjustment; + public TimelineMetricClusterAggregatorSecond(String aggregatorName, + TimelineMetricMetadataManager metadataManager, PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, String checkpointLocation, @@ -65,7 +68,7 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre sleepIntervalMillis, checkpointCutOffMultiplier, aggregatorDisabledParam, tableName, outputTableName, nativeTimeRangeDelay); - appAggregator = new TimelineMetricAppAggregator(metricsConf); + appAggregator = new TimelineMetricAppAggregator(metadataManager, metricsConf); this.timeSliceIntervalMillis = timeSliceInterval; this.serverTimeShiftAdjustment = Long.parseLong(metricsConf.get(SERVER_SIDE_TIMESIFT_ADJUSTMENT, "90000")); } http://git-wip-us.apache.org/repos/asf/ambari/blob/646fb429/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataKey.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataKey.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataKey.java new file mode 100644 index 0000000..ec97ee5 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataKey.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery; + +public class TimelineMetricMetadataKey { + String metricName; + String appId; + + public TimelineMetricMetadataKey(String metricName, String appId) { + this.metricName = metricName; + this.appId = appId; + } + + public String getMetricName() { + return metricName; + } + + public String getAppId() { + return appId; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TimelineMetricMetadataKey that = (TimelineMetricMetadataKey) o; + + if (!metricName.equals(that.metricName)) return false; + return !(appId != null ? !appId.equals(that.appId) : that.appId != null); + + } + + @Override + public int hashCode() { + int result = metricName.hashCode(); + result = 31 * result + (appId != null ? appId.hashCode() : 0); + return result; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/646fb429/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java new file mode 100644 index 0000000..1c1a1dc --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.MetadataException; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; + +import java.sql.SQLException; +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata.MetricType.UNDEFINED; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DISABLE_METRIC_METADATA_MGMT; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.METRICS_METADATA_SYNC_INIT_DELAY; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.METRICS_METADATA_SYNC_SCHEDULE_DELAY; + +public class TimelineMetricMetadataManager { + private static final Log LOG = LogFactory.getLog(TimelineMetricMetadataManager.class); + private boolean isDisabled = false; + // Cache all metadata on retrieval + private final Map<TimelineMetricMetadataKey, TimelineMetricMetadata> METADATA_CACHE = new ConcurrentHashMap<>(); + // Map to lookup apps on a host + private final Map<String, Set<String>> HOSTED_APPS_MAP = new ConcurrentHashMap<>(); + // Sync only when needed + AtomicBoolean SYNC_HOSTED_APPS_METADATA = new AtomicBoolean(false); + + // Single thread to sync back new writes to the store + private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + + private PhoenixHBaseAccessor hBaseAccessor; + private Configuration metricsConf; + + public TimelineMetricMetadataManager(PhoenixHBaseAccessor hBaseAccessor, + Configuration metricsConf) { + this.hBaseAccessor = hBaseAccessor; + this.metricsConf = metricsConf; + } + + /** + * Initialize Metadata from the store + */ + public void initializeMetadata() { + if (metricsConf.getBoolean(DISABLE_METRIC_METADATA_MGMT, false)) { + isDisabled = true; + } else { + // Schedule the executor to sync to store + executorService.scheduleWithFixedDelay(new TimelineMetricMetadataSync(this), + metricsConf.getInt(METRICS_METADATA_SYNC_INIT_DELAY, 120), // 2 minutes + metricsConf.getInt(METRICS_METADATA_SYNC_SCHEDULE_DELAY, 300), // 5 minutes + TimeUnit.SECONDS); + // Read from store and initialize map + try { + Map<TimelineMetricMetadataKey, TimelineMetricMetadata> metadata = + hBaseAccessor.getTimelineMetricMetadata(); + + LOG.info("Retrieved " + metadata.size() + ", metadata objects from store."); + // Store in the cache + METADATA_CACHE.putAll(metadata); + + Map<String, Set<String>> hostedAppData = hBaseAccessor.getHostedAppsMetadata(); + + LOG.info("Retrieved " + hostedAppData.size() + " host objects from store."); + HOSTED_APPS_MAP.putAll(hostedAppData); + + } catch (SQLException e) { + LOG.warn("Exception loading metric metadata", e); + } + } + } + + public Map<TimelineMetricMetadataKey, TimelineMetricMetadata> getMetadataCache() { + return METADATA_CACHE; + } + + public Map<String, Set<String>> getHostedAppsCache() { + return HOSTED_APPS_MAP; + } + + public boolean syncHostedAppsMetadata() { + return SYNC_HOSTED_APPS_METADATA.get(); + } + + public void markSuccessOnSyncHostedAppsMetadata() { + SYNC_HOSTED_APPS_METADATA.set(false); + } + + /** + * Update value in metadata cache + * @param metadata @TimelineMetricMetadata + */ + public void putIfModifiedTimelineMetricMetadata(TimelineMetricMetadata metadata) { + TimelineMetricMetadataKey key = new TimelineMetricMetadataKey( + metadata.getMetricName(), metadata.getAppId()); + + TimelineMetricMetadata metadataFromCache = METADATA_CACHE.get(key); + + if (metadataFromCache != null) { + try { + if (metadataFromCache.needsToBeSynced(metadata)) { + metadata.setIsPersisted(false); // Set the flag to ensure sync to store on next run + METADATA_CACHE.put(key, metadata); + } + } catch (MetadataException e) { + LOG.warn("Error inserting Metadata in cache.", e); + } + + } else { + METADATA_CACHE.put(key, metadata); + } + } + + /** + * Update value in hosted apps cache + * @param hostname Host name + * @param appId Application Id + */ + public void putIfModifiedHostedAppsMetadata(String hostname, String appId) { + Set<String> apps = HOSTED_APPS_MAP.get(hostname); + if (apps == null) { + apps = new HashSet<>(); + HOSTED_APPS_MAP.put(hostname, apps); + } + + if (!apps.contains(appId)) { + apps.add(appId); + SYNC_HOSTED_APPS_METADATA.set(true); + } + } + + public void persistMetadata(Collection<TimelineMetricMetadata> metadata) throws SQLException { + hBaseAccessor.saveMetricMetadata(metadata); + } + + public void persistHostedAppsMetadata(Map<String, Set<String>> hostedApps) throws SQLException { + hBaseAccessor.saveHostAppsMetadata(hostedApps); + } + + public TimelineMetricMetadata getTimelineMetricMetadata(TimelineMetric timelineMetric) { + return new TimelineMetricMetadata( + timelineMetric.getMetricName(), + timelineMetric.getAppId(), + timelineMetric.getType(), // Present type and unit are synonyms + UNDEFINED, // TODO: Add support for types in the application + timelineMetric.getStartTime(), + true + ); + } + + /** + * Fetch hosted apps from store + * @throws SQLException + */ + Map<String, Set<String>> getPersistedHostedAppsData() throws SQLException { + return hBaseAccessor.getHostedAppsMetadata(); + } + + public boolean isDisabled() { + return isDisabled; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/646fb429/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataSync.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataSync.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataSync.java new file mode 100644 index 0000000..54ea200 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataSync.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Sync metadata info with the store + */ +public class TimelineMetricMetadataSync implements Runnable { + private static final Log LOG = LogFactory.getLog(TimelineMetricMetadataSync.class); + + private final TimelineMetricMetadataManager cacheManager; + + public TimelineMetricMetadataSync(TimelineMetricMetadataManager cacheManager) { + this.cacheManager = cacheManager; + } + + @Override + public void run() { + List<TimelineMetricMetadata> metadataToPersist = new ArrayList<>(); + // Find all entries to persist + for (TimelineMetricMetadata metadata : cacheManager.getMetadataCache().values()) { + if (!metadata.isPersisted()) { + metadataToPersist.add(metadata); + } + } + boolean markSuccess = false; + if (!metadataToPersist.isEmpty()) { + try { + cacheManager.persistMetadata(metadataToPersist); + markSuccess = true; + } catch (SQLException e) { + LOG.warn("Error persisting metadata.", e); + } + } + // Mark corresponding entries as persisted to skip on next run + if (markSuccess) { + for (TimelineMetricMetadata metadata : metadataToPersist) { + TimelineMetricMetadataKey key = new TimelineMetricMetadataKey( + metadata.getMetricName(), metadata.getAppId() + ); + + // Mark entry as being persisted + metadata.setIsPersisted(true); + // Update cache + cacheManager.getMetadataCache().put(key, metadata); + } + } + // Sync hosted apps data is needed + if (cacheManager.syncHostedAppsMetadata()) { + Map<String, Set<String>> persistedData = null; + try { + persistedData = cacheManager.getPersistedHostedAppsData(); + } catch (SQLException e) { + LOG.warn("Failed on fetching hosted apps data from store.", e); + return; // Something wrong with store + } + + Map<String, Set<String>> cachedData = cacheManager.getHostedAppsCache(); + Map<String, Set<String>> dataToSync = new HashMap<>(); + if (cachedData != null && !cachedData.isEmpty()) { + for (Map.Entry<String, Set<String>> cacheEntry : cachedData.entrySet()) { + // No persistence / stale data in store + if (persistedData == null || persistedData.isEmpty() || + !persistedData.containsKey(cacheEntry.getKey()) || + !persistedData.get(cacheEntry.getKey()).containsAll(cacheEntry.getValue())) { + dataToSync.put(cacheEntry.getKey(), cacheEntry.getValue()); + } + } + try { + cacheManager.persistHostedAppsMetadata(dataToSync); + cacheManager.markSuccessOnSyncHostedAppsMetadata(); + + } catch (SQLException e) { + LOG.warn("Error persisting hosted apps metadata.", e); + } + } + + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/646fb429/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java index fa9fd73..cd1bfb3 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java @@ -22,10 +22,12 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.metrics2.sink.timeline.PrecisionLimitExceededException; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; import org.apache.hadoop.metrics2.sink.timeline.Precision; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; +import java.util.Collection; import java.util.concurrent.TimeUnit; /** @@ -102,6 +104,23 @@ public class PhoenixTransactSQL { "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " + "TTL=%s, COMPRESSION='%s'"; + public static final String CREATE_METRICS_METADATA_TABLE_SQL = + "CREATE TABLE IF NOT EXISTS METRICS_METADATA " + + "(METRIC_NAME VARCHAR, " + + "APP_ID VARCHAR, " + + "UNITS CHAR(20), " + + "TYPE CHAR(20), " + + "START_TIME UNSIGNED_LONG, " + + "SUPPORTS_AGGREGATION BOOLEAN " + + "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID)) " + + "DATA_BLOCK_ENCODING='%s', COMPRESSION='%s'"; + + public static final String CREATE_HOSTED_APPS_METADATA_TABLE_SQL = + "CREATE TABLE IF NOT EXISTS HOSTED_APPS_METADATA " + + "(HOSTNAME VARCHAR, APP_IDS VARCHAR, " + + "CONSTRAINT pk PRIMARY KEY (HOSTNAME))" + + "DATA_BLOCK_ENCODING='%s', COMPRESSION='%s'"; + /** * ALTER table to set new options */ @@ -148,6 +167,14 @@ public class PhoenixTransactSQL { "METRIC_COUNT) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + public static final String UPSERT_METADATA_SQL = + "UPSERT INTO METRICS_METADATA (METRIC_NAME, APP_ID, UNITS, TYPE, " + + "START_TIME, SUPPORTS_AGGREGATION) " + + "VALUES (?, ?, ?, ?, ?, ?)"; + + public static final String UPSERT_HOSTED_APPS_METADATA_SQL = + "UPSERT INTO HOSTED_APPS_METADATA (HOSTNAME, APP_IDS) VALUES (?, ?)"; + /** * Retrieve a set of rows from metrics records table. */ @@ -217,6 +244,13 @@ public class PhoenixTransactSQL { "METRIC_MIN " + "FROM %s"; + public static final String GET_METRIC_METADATA_SQL = "SELECT " + + "METRIC_NAME, APP_ID, UNITS, TYPE, START_TIME, " + + "SUPPORTS_AGGREGATION FROM METRICS_METADATA"; + + public static final String GET_HOSTED_APPS_METADATA_SQL = "SELECT " + + "HOSTNAME, APP_IDS FROM HOSTED_APPS_METADATA"; + /** * Aggregate host metrics using a GROUP BY clause to take advantage of * N - way parallel scan where N = number of regions. @@ -491,8 +525,7 @@ public class PhoenixTransactSQL { } private static PreparedStatement setQueryParameters(PreparedStatement stmt, - Condition condition) - throws SQLException { + Condition condition) throws SQLException { int pos = 1; //For GET_LATEST_METRIC_SQL_SINGLE_HOST parameters should be set 2 times do { http://git-wip-us.apache.org/repos/asf/ambari/blob/646fb429/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java index 51535b2..e9d77cc 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java @@ -20,11 +20,13 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp; import com.google.inject.Inject; import com.google.inject.Singleton; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.metrics2.sink.timeline.PrecisionLimitExceededException; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents; @@ -65,6 +67,7 @@ import java.util.Collection; import java.util.EnumSet; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; @@ -149,15 +152,15 @@ public class TimelineWebServices { TimelineEntities entities = null; try { entities = store.getEntities( - parseStr(entityType), - parseLongStr(limit), - parseLongStr(windowStart), - parseLongStr(windowEnd), - parseStr(fromId), - parseLongStr(fromTs), - parsePairStr(primaryFilter, ":"), - parsePairsStr(secondaryFilter, ",", ":"), - parseFieldsStr(fields, ",")); + parseStr(entityType), + parseLongStr(limit), + parseLongStr(windowStart), + parseLongStr(windowEnd), + parseStr(fromId), + parseLongStr(fromTs), + parsePairStr(primaryFilter, ":"), + parsePairsStr(secondaryFilter, ",", ":"), + parseFieldsStr(fields, ",")); } catch (NumberFormatException e) { throw new BadRequestException( "windowStart, windowEnd or limit is not a numeric value."); @@ -339,11 +342,11 @@ public class TimelineWebServices { * @param precision Precision [ seconds, minutes, hours ] * @param limit limit on total number of {@link TimelineMetric} records * retrieved. - * @return {@link TimelineMetrics} + * @return {@link @TimelineMetrics} */ @GET @Path("/metrics") - @Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */}) + @Produces({ MediaType.APPLICATION_JSON }) public TimelineMetrics getTimelineMetrics( @Context HttpServletRequest req, @Context HttpServletResponse res, @@ -387,11 +390,41 @@ public class TimelineWebServices { throw new WebApplicationException(sql, Response.Status.INTERNAL_SERVER_ERROR); } catch (IOException io) { - throw new WebApplicationException(io, - Response.Status.INTERNAL_SERVER_ERROR); + throw new WebApplicationException(io, Response.Status.INTERNAL_SERVER_ERROR); } } + @GET + @Path("/metrics/metadata") + @Produces({ MediaType.APPLICATION_JSON }) + public Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadata( + @Context HttpServletRequest req, + @Context HttpServletResponse res + ) { + init(res); + + try { + return timelineMetricStore.getTimelineMetricMetadata(); + } catch (Exception e) { + throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR); + } + } + + @GET + @Path("/metrics/hosts") + @Produces({ MediaType.APPLICATION_JSON }) + public Map<String, Set<String>> getHostedAppsMetadata( + @Context HttpServletRequest req, + @Context HttpServletResponse res + ) { + init(res); + + try { + return timelineMetricStore.getHostAppsMetadata(); + } catch (Exception e) { + throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR); + } + } /** * Store the given entities into the timeline store, and return the errors http://git-wip-us.apache.org/repos/asf/ambari/blob/646fb429/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java index a8bbc73..524ed2b 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java @@ -44,6 +44,8 @@ import java.net.URL; import java.net.URLClassLoader; import java.sql.Connection; import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.sql.Statement; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics @@ -156,15 +158,22 @@ public class TestApplicationHistoryServer { Connection connection = createNiceMock(Connection.class); Statement stmt = createNiceMock(Statement.class); + PreparedStatement preparedStatement = createNiceMock(PreparedStatement.class); + ResultSet rs = createNiceMock(ResultSet.class); mockStatic(DriverManager.class); expect(DriverManager.getConnection("jdbc:phoenix:localhost:2181:/ams-hbase-unsecure")) .andReturn(connection).anyTimes(); expect(connection.createStatement()).andReturn(stmt).anyTimes(); + expect(connection.prepareStatement(anyString())).andReturn(preparedStatement).anyTimes(); suppress(method(Statement.class, "executeUpdate", String.class)); + expect(preparedStatement.executeQuery()).andReturn(rs).anyTimes(); + expect(rs.next()).andReturn(false).anyTimes(); + preparedStatement.close(); + expectLastCall().anyTimes(); connection.close(); expectLastCall(); - EasyMock.replay(connection, stmt); + EasyMock.replay(connection, stmt, preparedStatement, rs); replayAll(); historyServer = new ApplicationHistoryServer(); http://git-wip-us.apache.org/repos/asf/ambari/blob/646fb429/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java index e73c741..8cbc56b 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java @@ -99,10 +99,16 @@ public abstract class AbstractMiniHBaseClusterTest extends BaseTest { stmt = conn.createStatement(); stmt.execute("delete from METRIC_AGGREGATE"); + stmt.execute("delete from METRIC_AGGREGATE_MINUTE"); stmt.execute("delete from METRIC_AGGREGATE_HOURLY"); + stmt.execute("delete from METRIC_AGGREGATE_DAILY"); stmt.execute("delete from METRIC_RECORD"); - stmt.execute("delete from METRIC_RECORD_HOURLY"); stmt.execute("delete from METRIC_RECORD_MINUTE"); + stmt.execute("delete from METRIC_RECORD_HOURLY"); + stmt.execute("delete from METRIC_RECORD_DAILY"); + stmt.execute("delete from METRICS_METADATA"); + stmt.execute("delete from HOSTED_APPS_METADATA"); + conn.commit(); } finally { if (stmt != null) { http://git-wip-us.apache.org/repos/asf/ambari/blob/646fb429/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java index 5e7234c..0522f81 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java @@ -27,11 +27,13 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline. import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition; import org.junit.After; import org.junit.Before; import org.junit.Test; + import java.io.IOException; import java.sql.Connection; import java.sql.SQLException; @@ -41,6 +43,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; + import static junit.framework.Assert.assertEquals; import static junit.framework.Assert.assertTrue; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineClusterMetric; @@ -204,7 +207,8 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest { public void testGetClusterMetricRecordsSeconds() throws Exception { // GIVEN TimelineMetricAggregator agg = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, new Configuration()); + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond( + hdb, new Configuration(), new TimelineMetricMetadataManager(hdb, new Configuration())); long startTime = System.currentTimeMillis(); long ctime = startTime + 1; @@ -243,7 +247,8 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest { public void testGetClusterMetricRecordLatestWithFunction() throws Exception { // GIVEN TimelineMetricAggregator agg = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, new Configuration()); + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond + (hdb, new Configuration(), new TimelineMetricMetadataManager(hdb, new Configuration())); long startTime = System.currentTimeMillis(); long ctime = startTime + 1; http://git-wip-us.apache.org/repos/asf/ambari/blob/646fb429/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java index 7c8138b..8f8067b 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java @@ -19,13 +19,17 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline import org.apache.hadoop.metrics2.sink.timeline.Precision; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import java.io.IOException; import java.sql.SQLException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.TreeMap; public class TestTimelineMetricStore implements TimelineMetricStore { @@ -80,4 +84,14 @@ public class TestTimelineMetricStore implements TimelineMetricStore { return new TimelinePutResponse(); } + + @Override + public Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadata() throws SQLException, IOException { + return null; + } + + @Override + public Map<String, Set<String>> getHostAppsMetadata() throws SQLException, IOException { + return Collections.emptyMap(); + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/646fb429/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java index 6672dae..f201224 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline. import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricReadHelper; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; @@ -75,7 +76,8 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { public void testShouldAggregateClusterProperly() throws Exception { // GIVEN TimelineMetricAggregator agg = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, getConfigurationForTest(false)); + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, + getConfigurationForTest(false), new TimelineMetricMetadataManager(hdb, new Configuration())); TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); long startTime = System.currentTimeMillis(); @@ -127,7 +129,8 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { public void testShouldAggregateClusterIgnoringInstance() throws Exception { // GIVEN TimelineMetricAggregator agg = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, getConfigurationForTest(false)); + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, + getConfigurationForTest(false), new TimelineMetricMetadataManager(hdb, new Configuration())); TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); long startTime = System.currentTimeMillis(); @@ -202,7 +205,8 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { public void testShouldAggregateDifferentMetricsOnClusterProperly() throws Exception { // GIVEN TimelineMetricAggregator agg = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, getConfigurationForTest(false)); + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, + getConfigurationForTest(false), new TimelineMetricMetadataManager(hdb, new Configuration())); TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); // here we put some metrics tha will be aggregated @@ -485,7 +489,8 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { Configuration conf = getConfigurationForTest(false); conf.set(CLUSTER_AGGREGATOR_APP_IDS, "app1"); TimelineMetricAggregator agg = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, conf); + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, + conf, new TimelineMetricMetadataManager(hdb, new Configuration())); TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); long startTime = System.currentTimeMillis(); @@ -536,7 +541,8 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { @Test public void testClusterAggregateMetricNormalization() throws Exception { TimelineMetricAggregator agg = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, getConfigurationForTest(false)); + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, + getConfigurationForTest(false), new TimelineMetricMetadataManager(hdb, new Configuration())); TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); // Sample data