Repository: ambari Updated Branches: refs/heads/trunk 41a41e64b -> c0da0a1d9
AMBARI-11010. Aggregator function does not work for point in time service component metric query. (swagle) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/c0da0a1d Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/c0da0a1d Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/c0da0a1d Branch: refs/heads/trunk Commit: c0da0a1d9b295b2ac7f0ac5638b255852e816feb Parents: 41a41e6 Author: Siddharth Wagle <swa...@hortonworks.com> Authored: Fri May 8 10:08:16 2015 -0700 Committer: Siddharth Wagle <swa...@hortonworks.com> Committed: Fri May 8 10:42:26 2015 -0700 ---------------------------------------------------------------------- .../timeline/SingleValuedTimelineMetric.java | 107 +++++++++++ .../metrics2/sink/timeline/TimelineMetrics.java | 26 +++ .../timeline/HBaseTimelineMetricStore.java | 3 +- .../metrics/timeline/PhoenixHBaseAccessor.java | 191 +++++++------------ .../TimelineMetricHostAggregator.java | 9 +- .../aggregators/TimelineMetricReadHelper.java | 29 ++- .../metrics/timeline/ITClusterAggregator.java | 1 - .../metrics/timeline/ITMetricAggregator.java | 20 +- .../timeline/ITPhoenixHBaseAccessor.java | 52 ++++- 9 files changed, 292 insertions(+), 146 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/c0da0a1d/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/SingleValuedTimelineMetric.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/SingleValuedTimelineMetric.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/SingleValuedTimelineMetric.java new file mode 100644 index 0000000..8ecca54 --- /dev/null +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/SingleValuedTimelineMetric.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.sink.timeline; + +/** + * This class prevents creating a TreeMap for every instantiation of a metric + * read from the store. The methods are meant to provide interoperability + * with @TimelineMetric + */ +public class SingleValuedTimelineMetric { + private Long timestamp; + private Double value; + private String metricName; + private String appId; + private String instanceId; + private String hostName; + private Long startTime; + private String type; + + public void setSingleTimeseriesValue(Long timestamp, Double value) { + this.timestamp = timestamp; + this.value = value; + } + + public SingleValuedTimelineMetric(String metricName, String appId, + String instanceId, String hostName, + long timestamp, long startTime, String type) { + this.metricName = metricName; + this.appId = appId; + this.instanceId = instanceId; + this.hostName = hostName; + this.timestamp = timestamp; + this.startTime = startTime; + this.type = type; + } + + public Long getTimestamp() { + return timestamp; + } + + public long getStartTime() { + return startTime; + } + + public String getType() { + return type; + } + + public Double getValue() { + return value; + } + + public String getMetricName() { + return metricName; + } + + public String getAppId() { + return appId; + } + + public String getInstanceId() { + return instanceId; + } + + public String getHostName() { + return hostName; + } + + public boolean equalsExceptTime(TimelineMetric metric) { + if (!metricName.equals(metric.getMetricName())) return false; + if (hostName != null ? !hostName.equals(metric.getHostName()) : metric.getHostName() != null) + return false; + if (appId != null ? !appId.equals(metric.getAppId()) : metric.getAppId() != null) + return false; + if (instanceId != null ? !instanceId.equals(metric.getInstanceId()) : metric.getInstanceId() != null) return false; + + return true; + } + + public TimelineMetric getTimelineMetric() { + TimelineMetric metric = new TimelineMetric(); + metric.setMetricName(this.metricName); + metric.setAppId(this.appId); + metric.setHostName(this.hostName); + metric.setType(this.type); + metric.setInstanceId(this.instanceId); + metric.setStartTime(this.startTime); + metric.setTimestamp(this.timestamp); + metric.getMetricValues().put(timestamp, value); + return metric; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/c0da0a1d/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java index 3eb0e89..383079a 100644 --- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java @@ -99,4 +99,30 @@ public class TimelineMetrics { allMetrics.add(metric); } } + + // Optimization that addresses too many TreeMaps from getting created. + public void addOrMergeTimelineMetric(SingleValuedTimelineMetric metric) { + TimelineMetric metricToMerge = null; + + if (!allMetrics.isEmpty()) { + for (TimelineMetric timelineMetric : allMetrics) { + if (metric.equalsExceptTime(timelineMetric)) { + metricToMerge = timelineMetric; + break; + } + } + } + + if (metricToMerge != null) { + metricToMerge.getMetricValues().put(metric.getTimestamp(), metric.getValue()); + if (metricToMerge.getTimestamp() > metric.getTimestamp()) { + metricToMerge.setTimestamp(metric.getTimestamp()); + } + if (metricToMerge.getStartTime() > metric.getStartTime()) { + metricToMerge.setStartTime(metric.getStartTime()); + } + } else { + allMetrics.add(metric.getTimelineMetric()); + } + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/c0da0a1d/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java index 447f6f9..f5d6bc0 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java @@ -191,8 +191,7 @@ public class HBaseTimelineMetricStore extends AbstractService return metricValues; } - public static HashMap<String, List<Function>> - parseMetricNamesToAggregationFunctions(List<String> metricNames) { + public static HashMap<String, List<Function>> parseMetricNamesToAggregationFunctions(List<String> metricNames) { HashMap<String, List<Function>> metricsFunctions = new HashMap<String, List<Function>>(); http://git-wip-us.apache.org/repos/asf/ambari/blob/c0da0a1d/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java index 7258cad..b890171 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java @@ -22,6 +22,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.hadoop.hbase.util.RetryCounterFactory; +import org.apache.hadoop.metrics2.sink.timeline.SingleValuedTimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils; @@ -39,7 +40,6 @@ import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.phoenix.exception.SQLExceptionCode; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.type.TypeReference; - import java.io.IOException; import java.sql.Connection; import java.sql.PreparedStatement; @@ -50,9 +50,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.TreeMap; import java.util.concurrent.TimeUnit; - import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_DAILY_TABLE_TTL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_HOUR_TABLE_TTL; @@ -132,23 +130,23 @@ public class PhoenixHBaseAccessor { private static TimelineMetric getLastTimelineMetricFromResultSet(ResultSet rs) throws SQLException, IOException { - TimelineMetric metric = TIMELINE_METRIC_READ_HELPER - .getTimelineMetricCommonsFromResultSet(rs); + TimelineMetric metric = TIMELINE_METRIC_READ_HELPER.getTimelineMetricCommonsFromResultSet(rs); metric.setMetricValues(readLastMetricValueFromJSON(rs.getString("METRICS"))); - return metric; } - public static TimelineMetric getAggregatedTimelineMetricFromResultSet( - ResultSet rs, Function f) throws SQLException, IOException { + public static SingleValuedTimelineMetric getAggregatedTimelineMetricFromResultSet( + ResultSet rs, Function f) throws SQLException, IOException { - TimelineMetric metric = new TimelineMetric(); - metric.setHostName(rs.getString("HOSTNAME")); - metric.setAppId(rs.getString("APP_ID")); - metric.setInstanceId(rs.getString("INSTANCE_ID")); - metric.setTimestamp(rs.getLong("SERVER_TIME")); - metric.setStartTime(rs.getLong("SERVER_TIME")); - metric.setType(rs.getString("UNITS")); + SingleValuedTimelineMetric metric = new SingleValuedTimelineMetric( + rs.getString("METRIC_NAME") + f.getSuffix(), + rs.getString("APP_ID"), + rs.getString("INSTANCE_ID"), + rs.getString("HOSTNAME"), + rs.getLong("SERVER_TIME"), + rs.getLong("SERVER_TIME"), + rs.getString("UNITS") + ); // get functions for metricnames @@ -171,11 +169,8 @@ public class PhoenixHBaseAccessor { break; } - metric.setMetricName(rs.getString("METRIC_NAME") + f.getSuffix()); + metric.setSingleTimeseriesValue(rs.getLong("SERVER_TIME"), value); - Map<Long, Double> valueMap = new TreeMap<Long, Double>(); - valueMap.put(rs.getLong("SERVER_TIME"), value); - metric.setMetricValues(valueMap); return metric; } @@ -194,30 +189,6 @@ public class PhoenixHBaseAccessor { return (Map<Long, Double>) mapper.readValue(json, metricValuesTypeRef); } - public static TimelineMetric getTimelineMetricKeyFromResultSet(ResultSet rs) - throws SQLException, IOException { - TimelineMetric metric = new TimelineMetric(); - metric.setMetricName(rs.getString("METRIC_NAME")); - metric.setAppId(rs.getString("APP_ID")); - metric.setInstanceId(rs.getString("INSTANCE_ID")); - metric.setHostName(rs.getString("HOSTNAME")); - metric.setTimestamp(rs.getLong("SERVER_TIME")); - metric.setType(rs.getString("UNITS")); - return metric; - } - - public static MetricHostAggregate getMetricHostAggregateFromResultSet(ResultSet rs) - throws SQLException { - MetricHostAggregate metricHostAggregate = new MetricHostAggregate(); - metricHostAggregate.setSum(rs.getDouble("METRIC_SUM")); - metricHostAggregate.setMax(rs.getDouble("METRIC_MAX")); - metricHostAggregate.setMin(rs.getDouble("METRIC_MIN")); - metricHostAggregate.setNumberOfSamples(rs.getLong("METRIC_COUNT")); - - metricHostAggregate.setDeviation(0.0); - return metricHostAggregate; - } - private Connection getConnectionRetryingOnException() throws SQLException, InterruptedException { RetryCounter retryCounter = retryCounterFactory.create(); @@ -465,9 +436,9 @@ public class PhoenixHBaseAccessor { } private void appendMetricFromResultSet( - TimelineMetrics metrics, Condition condition, Map<String, - List<Function>> metricFunctions, ResultSet rs) - throws SQLException, IOException { + TimelineMetrics metrics, Condition condition, Map<String, + List<Function>> metricFunctions, ResultSet rs) + throws SQLException, IOException { if (condition.getPrecision() == Precision.HOURS || condition.getPrecision() == Precision.MINUTES) { @@ -475,14 +446,12 @@ public class PhoenixHBaseAccessor { List<Function> functions = metricFunctions.get(metricName); for (Function f : functions) { - TimelineMetric metric; - - metric = getAggregatedTimelineMetricFromResultSet(rs, f); + SingleValuedTimelineMetric metric = getAggregatedTimelineMetricFromResultSet(rs, f); if (condition.isGrouped()) { metrics.addOrMergeTimelineMetric(metric); } else { - metrics.getMetrics().add(metric); + metrics.getMetrics().add(metric.getTimelineMetric()); } } } @@ -540,10 +509,8 @@ public class PhoenixHBaseAccessor { * @return @TimelineMetrics * @throws SQLException */ - public TimelineMetrics getAggregateMetricRecords( - final Condition condition, - Map<String, List<Function>> metricFunctions) - throws SQLException { + public TimelineMetrics getAggregateMetricRecords(final Condition condition, + Map<String, List<Function>> metricFunctions) throws SQLException { validateConditionIsNotEmpty(condition); @@ -555,14 +522,13 @@ public class PhoenixHBaseAccessor { try { //get latest if(condition.isPointInTime()) { - stmt = getLatestAggregateMetricRecords(condition, conn, metrics); + stmt = getLatestAggregateMetricRecords(condition, conn, metrics, metricFunctions); } else { stmt = PhoenixTransactSQL.prepareGetAggregateSqlStmt(conn, condition); rs = stmt.executeQuery(); while (rs.next()) { - appendAggregateMetricFromResultSet(metrics, condition, - metricFunctions, rs); + appendAggregateMetricFromResultSet(metrics, condition, metricFunctions, rs); } } } finally { @@ -593,34 +559,34 @@ public class PhoenixHBaseAccessor { return metrics; } - private void appendAggregateMetricFromResultSet( - TimelineMetrics metrics, Condition condition, - Map<String, List<Function>> metricFunctions, ResultSet rs) - throws SQLException { + private void appendAggregateMetricFromResultSet(TimelineMetrics metrics, + Condition condition, Map<String, List<Function>> metricFunctions, + ResultSet rs) throws SQLException { String metricName = rs.getString("METRIC_NAME"); List<Function> functions = metricFunctions.get(metricName); for (Function aggregateFunction : functions) { - TimelineMetric metric; + SingleValuedTimelineMetric metric; - if (condition.getPrecision() == Precision.HOURS) { - metric = getAggregateHoursTimelineMetricFromResultSet(rs, aggregateFunction); + if (condition.getPrecision() == Precision.HOURS + || condition.getPrecision() == Precision.DAYS) { + metric = getAggregateTimelineMetricFromResultSet(rs, aggregateFunction, false); } else { - metric = getAggregateTimelineMetricFromResultSet(rs, aggregateFunction); + metric = getAggregateTimelineMetricFromResultSet(rs, aggregateFunction, true); } if (condition.isGrouped()) { metrics.addOrMergeTimelineMetric(metric); } else { - metrics.getMetrics().add(metric); + metrics.getMetrics().add(metric.getTimelineMetric()); } } } - private PreparedStatement getLatestAggregateMetricRecords( - Condition condition, Connection conn, TimelineMetrics metrics) - throws SQLException { + private PreparedStatement getLatestAggregateMetricRecords(Condition condition, + Connection conn, TimelineMetrics metrics, + Map<String, List<Function>> metricFunctions) throws SQLException { PreparedStatement stmt = null; SplitByMetricNamesCondition splitCondition = @@ -629,15 +595,28 @@ public class PhoenixHBaseAccessor { for (String metricName: splitCondition.getOriginalMetricNames()) { splitCondition.setCurrentMetric(metricName); - stmt = PhoenixTransactSQL.prepareGetLatestAggregateMetricSqlStmt(conn, - splitCondition); + stmt = PhoenixTransactSQL.prepareGetLatestAggregateMetricSqlStmt(conn, splitCondition); ResultSet rs = null; try { rs = stmt.executeQuery(); while (rs.next()) { - TimelineMetric metric = getAggregateTimelineMetricFromResultSet(rs, - new Function()); - metrics.getMetrics().add(metric); + List<Function> functions = metricFunctions.get(metricName); + if (functions != null) { + for (Function f : functions) { + SingleValuedTimelineMetric metric = + getAggregateTimelineMetricFromResultSet(rs, f, true); + + if (condition.isGrouped()) { + metrics.addOrMergeTimelineMetric(metric); + } else { + metrics.getMetrics().add(metric.getTimelineMetric()); + } + } + } else { + SingleValuedTimelineMetric metric = + getAggregateTimelineMetricFromResultSet(rs, new Function(), true); + metrics.getMetrics().add(metric.getTimelineMetric()); + } } } finally { if (rs != null) { @@ -653,54 +632,28 @@ public class PhoenixHBaseAccessor { return stmt; } - private TimelineMetric getAggregateTimelineMetricFromResultSet( - ResultSet rs, Function f) throws SQLException { - TimelineMetric metric = new TimelineMetric(); - metric.setAppId(rs.getString("APP_ID")); - metric.setInstanceId(rs.getString("INSTANCE_ID")); - metric.setTimestamp(rs.getLong("SERVER_TIME")); - metric.setStartTime(rs.getLong("SERVER_TIME")); + private SingleValuedTimelineMetric getAggregateTimelineMetricFromResultSet(ResultSet rs, + Function f, boolean useHostCount) throws SQLException { - double value; - switch(f.getReadFunction()){ - case AVG: - value = rs.getDouble("METRIC_SUM") / rs.getInt("HOSTS_COUNT"); - break; - case MIN: - value = rs.getDouble("METRIC_MIN"); - break; - case MAX: - value = rs.getDouble("METRIC_MAX"); - break; - case SUM: - value = rs.getDouble("METRIC_SUM"); - break; - default: - value = rs.getDouble("METRIC_SUM") / rs.getInt("HOSTS_COUNT"); - break; + String countColumnName = "METRIC_COUNT"; + if (useHostCount) { + countColumnName = "HOSTS_COUNT"; } - metric.setMetricName(rs.getString("METRIC_NAME") + f.getSuffix()); - - Map<Long, Double> valueMap = new TreeMap<Long, Double>(); - valueMap.put(rs.getLong("SERVER_TIME"), value); - metric.setMetricValues(valueMap); - - return metric; - } - - private TimelineMetric getAggregateHoursTimelineMetricFromResultSet( - ResultSet rs, Function f) throws SQLException { - TimelineMetric metric = new TimelineMetric(); - metric.setAppId(rs.getString("APP_ID")); - metric.setInstanceId(rs.getString("INSTANCE_ID")); - metric.setTimestamp(rs.getLong("SERVER_TIME")); - metric.setStartTime(rs.getLong("SERVER_TIME")); + SingleValuedTimelineMetric metric = new SingleValuedTimelineMetric( + rs.getString("METRIC_NAME") + f.getSuffix(), + rs.getString("APP_ID"), + rs.getString("INSTANCE_ID"), + null, + rs.getLong("SERVER_TIME"), + rs.getLong("SERVER_TIME"), + rs.getString("UNITS") + ); double value; switch(f.getReadFunction()){ case AVG: - value = rs.getDouble("METRIC_SUM") / rs.getInt("METRIC_COUNT"); + value = rs.getDouble("METRIC_SUM") / rs.getInt(countColumnName); break; case MIN: value = rs.getDouble("METRIC_MIN"); @@ -712,15 +665,11 @@ public class PhoenixHBaseAccessor { value = rs.getDouble("METRIC_SUM"); break; default: - value = rs.getDouble("METRIC_SUM") / rs.getInt("METRIC_COUNT"); + value = rs.getDouble("METRIC_SUM") / rs.getInt(countColumnName); break; } - metric.setMetricName(rs.getString("METRIC_NAME") + f.getSuffix()); - - Map<Long, Double> valueMap = new TreeMap<Long, Double>(); - valueMap.put(rs.getLong("SERVER_TIME"), value); - metric.setMetricValues(valueMap); + metric.setSingleTimeseriesValue(rs.getLong("SERVER_TIME"), value); return metric; } http://git-wip-us.apache.org/repos/asf/ambari/blob/c0da0a1d/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java index 18e5f18..796cb72 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java @@ -34,6 +34,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti public class TimelineMetricHostAggregator extends AbstractTimelineAggregator { private static final Log LOG = LogFactory.getLog(TimelineMetricHostAggregator.class); + TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); public TimelineMetricHostAggregator(PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, @@ -79,14 +80,14 @@ public class TimelineMetricHostAggregator extends AbstractTimelineAggregator { throws IOException, SQLException { TimelineMetric existingMetric = null; MetricHostAggregate hostAggregate = null; - Map<TimelineMetric, MetricHostAggregate> hostAggregateMap = - new HashMap<TimelineMetric, MetricHostAggregate>(); + Map<TimelineMetric, MetricHostAggregate> hostAggregateMap = new HashMap<TimelineMetric, MetricHostAggregate>(); + while (rs.next()) { TimelineMetric currentMetric = - PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs); + readHelper.getTimelineMetricKeyFromResultSet(rs); MetricHostAggregate currentHostAggregate = - PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs); + readHelper.getMetricHostAggregateFromResultSet(rs); if (existingMetric == null) { // First row http://git-wip-us.apache.org/repos/asf/ambari/blob/c0da0a1d/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java index 25f8c62..398f4c3 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java @@ -38,7 +38,7 @@ public class TimelineMetricReadHelper { } public TimelineMetric getTimelineMetricFromResultSet(ResultSet rs) - throws SQLException, IOException { + throws SQLException, IOException { TimelineMetric metric = getTimelineMetricCommonsFromResultSet(rs); Map<Long, Double> sortedByTimeMetrics = new TreeMap<Long, Double>( PhoenixHBaseAccessor.readMetricFromJSON(rs.getString("METRICS"))); @@ -50,7 +50,7 @@ public class TimelineMetricReadHelper { * Returns common part of timeline metrics record without the values. */ public TimelineMetric getTimelineMetricCommonsFromResultSet(ResultSet rs) - throws SQLException { + throws SQLException { TimelineMetric metric = new TimelineMetric(); metric.setMetricName(rs.getString("METRIC_NAME")); metric.setAppId(rs.getString("APP_ID")); @@ -76,7 +76,7 @@ public class TimelineMetricReadHelper { } public MetricClusterAggregate getMetricClusterTimeAggregateFromResultSet(ResultSet rs) - throws SQLException { + throws SQLException { MetricClusterAggregate agg = new MetricClusterAggregate(); agg.setSum(rs.getDouble("METRIC_SUM")); agg.setMax(rs.getDouble("METRIC_MAX")); @@ -98,5 +98,28 @@ public class TimelineMetricReadHelper { rs.getString("UNITS")); } + public MetricHostAggregate getMetricHostAggregateFromResultSet(ResultSet rs) + throws SQLException { + MetricHostAggregate metricHostAggregate = new MetricHostAggregate(); + metricHostAggregate.setSum(rs.getDouble("METRIC_SUM")); + metricHostAggregate.setMax(rs.getDouble("METRIC_MAX")); + metricHostAggregate.setMin(rs.getDouble("METRIC_MIN")); + metricHostAggregate.setNumberOfSamples(rs.getLong("METRIC_COUNT")); + + metricHostAggregate.setDeviation(0.0); + return metricHostAggregate; + } + + public TimelineMetric getTimelineMetricKeyFromResultSet(ResultSet rs) + throws SQLException, IOException { + TimelineMetric metric = new TimelineMetric(); + metric.setMetricName(rs.getString("METRIC_NAME")); + metric.setAppId(rs.getString("APP_ID")); + metric.setInstanceId(rs.getString("INSTANCE_ID")); + metric.setHostName(rs.getString("HOSTNAME")); + metric.setTimestamp(rs.getLong("SERVER_TIME")); + metric.setType(rs.getString("UNITS")); + return metric; + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/c0da0a1d/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java index aa276f3..fb3bc30 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java @@ -135,7 +135,6 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { } } - @Test public void testShouldAggregateClusterIgnoringInstance() throws Exception { // GIVEN http://git-wip-us.apache.org/repos/asf/ambari/blob/c0da0a1d/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java index f4f2223..5f646fe 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java @@ -23,6 +23,7 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricReadHelper; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; @@ -107,6 +108,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest { // GIVEN TimelineMetricAggregator aggregatorMinute = TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hdb, new Configuration()); + TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); long startTime = System.currentTimeMillis(); long ctime = startTime; @@ -137,9 +139,9 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest { int count = 0; while (rs.next()) { TimelineMetric currentMetric = - PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs); + readHelper.getTimelineMetricKeyFromResultSet(rs); MetricHostAggregate currentHostAggregate = - PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs); + readHelper.getMetricHostAggregateFromResultSet(rs); if ("disk_free".equals(currentMetric.getMetricName())) { assertEquals(2.0, currentHostAggregate.getMax()); @@ -167,6 +169,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest { // GIVEN TimelineMetricAggregator aggregator = TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hdb, new Configuration()); + TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); long startTime = System.currentTimeMillis(); MetricHostAggregate expectedAggregate = @@ -210,9 +213,9 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest { while (rs.next()) { TimelineMetric currentMetric = - PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs); + readHelper.getTimelineMetricKeyFromResultSet(rs); MetricHostAggregate currentHostAggregate = - PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs); + readHelper.getMetricHostAggregateFromResultSet(rs); if ("disk_used".equals(currentMetric.getMetricName())) { assertEquals(2.0, currentHostAggregate.getMax()); @@ -229,6 +232,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest { // GIVEN TimelineMetricAggregator aggregator = TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(hdb, new Configuration()); + TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); long startTime = System.currentTimeMillis(); MetricHostAggregate expectedAggregate = @@ -270,9 +274,9 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest { while (rs.next()) { TimelineMetric currentMetric = - PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs); + readHelper.getTimelineMetricKeyFromResultSet(rs); MetricHostAggregate currentHostAggregate = - PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs); + readHelper.getMetricHostAggregateFromResultSet(rs); if ("disk_used".equals(currentMetric.getMetricName())) { assertEquals(2.0, currentHostAggregate.getMax()); @@ -301,9 +305,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest { return metrics; } - private TimelineMetric createMetric(long startTime, - String metricName, - String host) { + private TimelineMetric createMetric(long startTime, String metricName, String host) { TimelineMetric m = new TimelineMetric(); m.setAppId("host"); m.setHostName(host); http://git-wip-us.apache.org/repos/asf/ambari/blob/c0da0a1d/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java index 4c56f77..00db767 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java @@ -86,20 +86,20 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest { long ctime = startTime; long minute = 60 * 1000; hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", - "disk_free", 1)); + "disk_free", 1)); hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2", - "disk_free", 2)); + "disk_free", 2)); ctime += minute; hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", - "disk_free", 2)); + "disk_free", 2)); hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2", - "disk_free", 1)); + "disk_free", 1)); // WHEN long endTime = ctime + minute; Condition condition = new DefaultCondition( - Collections.singletonList("disk_free"), "local1", null, null, startTime, - endTime, Precision.SECONDS, null, true); + Collections.singletonList("disk_free"), "local1", null, null, startTime, + endTime, Precision.SECONDS, null, true); TimelineMetrics timelineMetrics = hdb.getMetricRecords(condition, singletonValueFunctionMap("disk_free")); @@ -240,6 +240,46 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest { } @Test + public void testGetClusterMetricRecordLatestWithFunction() throws Exception { + // GIVEN + TimelineMetricAggregator agg = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, new Configuration()); + + long startTime = System.currentTimeMillis(); + long ctime = startTime + 1; + long minute = 60 * 1000; + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", + "disk_free", 1)); + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2", + "disk_free", 2)); + ctime += minute; + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", + "disk_free", 2)); + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2", + "disk_free", 1)); + + long endTime = ctime + minute + 1; + boolean success = agg.doWork(startTime, endTime); + assertTrue(success); + + // WHEN + Condition condition = new DefaultCondition( + Collections.singletonList("disk_free"), null, null, null, + null, null, Precision.SECONDS, null, true); + TimelineMetrics timelineMetrics = hdb.getAggregateMetricRecords(condition, + Collections.singletonMap("disk_free", + Collections.singletonList(new Function(Function.ReadFunction.SUM, null)))); + + //THEN + assertEquals(1, timelineMetrics.getMetrics().size()); + TimelineMetric metric = timelineMetrics.getMetrics().get(0); + + assertEquals("disk_free._sum", metric.getMetricName()); + assertEquals(1, metric.getMetricValues().size()); + assertEquals(3, metric.getMetricValues().values().iterator().next().intValue()); + } + + @Test public void testGetClusterMetricRecordsHours() throws Exception { // GIVEN TimelineMetricAggregator agg =