Repository: ambari Updated Branches: refs/heads/trunk 2557d9a8f -> 67c425acf
http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorMinute.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorMinute.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorMinute.java new file mode 100644 index 0000000..293608e --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorMinute.java @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; +import java.io.IOException; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA; + +/** + * Aggregates a metric across all hosts in the cluster. Reads metrics from + * the precision table and saves into the aggregate. + */ +public class TimelineMetricClusterAggregatorMinute extends AbstractTimelineAggregator { + private static final Log LOG = LogFactory.getLog(TimelineMetricClusterAggregatorMinute.class); + public Long timeSliceIntervalMillis; + private TimelineMetricReadHelper timelineMetricReadHelper = new TimelineMetricReadHelper(true); + // Aggregator to perform app-level aggregates for host metrics + private final TimelineMetricAppAggregator appAggregator; + + public TimelineMetricClusterAggregatorMinute(PhoenixHBaseAccessor hBaseAccessor, + Configuration metricsConf, + String checkpointLocation, + Long sleepIntervalMillis, + Integer checkpointCutOffMultiplier, + String aggregatorDisabledParam, + String tableName, + String outputTableName, + Long nativeTimeRangeDelay, + Long timeSliceInterval) { + super(hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, + checkpointCutOffMultiplier, aggregatorDisabledParam, tableName, + outputTableName, nativeTimeRangeDelay); + + appAggregator = new TimelineMetricAppAggregator(metricsConf); + this.timeSliceIntervalMillis = timeSliceInterval; + } + + @Override + protected void aggregate(ResultSet rs, long startTime, long endTime) throws SQLException, IOException { + List<Long[]> timeSlices = getTimeSlices(startTime, endTime); + // Initialize app aggregates for host metrics + appAggregator.init(); + Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics = + aggregateMetricsFromResultSet(rs, timeSlices); + + LOG.info("Saving " + aggregateClusterMetrics.size() + " metric aggregates."); + hBaseAccessor.saveClusterAggregateRecords(aggregateClusterMetrics); + appAggregator.cleanup(); + } + + @Override + protected Condition prepareMetricQueryCondition(long startTime, long endTime) { + Condition condition = new DefaultCondition(null, null, null, null, startTime, + endTime, null, null, true); + condition.setNoLimit(); + condition.setFetchSize(resultsetFetchSize); + condition.setStatement(String.format(GET_METRIC_SQL, + PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA), + METRICS_RECORD_TABLE_NAME)); + condition.addOrderByColumn("METRIC_NAME"); + condition.addOrderByColumn("APP_ID"); + condition.addOrderByColumn("INSTANCE_ID"); + condition.addOrderByColumn("SERVER_TIME"); + return condition; + } + + private List<Long[]> getTimeSlices(long startTime, long endTime) { + List<Long[]> timeSlices = new ArrayList<Long[]>(); + long sliceStartTime = startTime; + while (sliceStartTime < endTime) { + timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + timeSliceIntervalMillis}); + sliceStartTime += timeSliceIntervalMillis; + } + return timeSlices; + } + + private Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMetricsFromResultSet(ResultSet rs, List<Long[]> timeSlices) + throws SQLException, IOException { + Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics = + new HashMap<TimelineClusterMetric, MetricClusterAggregate>(); + // Create time slices + + while (rs.next()) { + TimelineMetric metric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs); + + Map<TimelineClusterMetric, Double> clusterMetrics = + sliceFromTimelineMetric(metric, timeSlices); + + if (clusterMetrics != null && !clusterMetrics.isEmpty()) { + for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry : + clusterMetrics.entrySet()) { + + TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey(); + Double avgValue = clusterMetricEntry.getValue(); + + MetricClusterAggregate aggregate = aggregateClusterMetrics.get(clusterMetric); + + if (aggregate == null) { + aggregate = new MetricClusterAggregate(avgValue, 1, null, avgValue, avgValue); + aggregateClusterMetrics.put(clusterMetric, aggregate); + } else { + aggregate.updateSum(avgValue); + aggregate.updateNumberOfHosts(1); + aggregate.updateMax(avgValue); + aggregate.updateMin(avgValue); + } + // Update app level aggregates + appAggregator.processTimelineClusterMetric(clusterMetric, + metric.getHostName(), avgValue); + } + } + } + // Add app level aggregates to save + aggregateClusterMetrics.putAll(appAggregator.getAggregateClusterMetrics()); + return aggregateClusterMetrics; + } + + private Map<TimelineClusterMetric, Double> sliceFromTimelineMetric( + TimelineMetric timelineMetric, List<Long[]> timeSlices) { + + if (timelineMetric.getMetricValues().isEmpty()) { + return null; + } + + Map<TimelineClusterMetric, Double> timelineClusterMetricMap = + new HashMap<TimelineClusterMetric, Double>(); + + for (Map.Entry<Long, Double> metric : timelineMetric.getMetricValues().entrySet()) { + // TODO: investigate null values - pre filter + if (metric.getValue() == null) { + continue; + } + Long timestamp = getSliceTimeForMetric(timeSlices, + Long.parseLong(metric.getKey().toString())); + if (timestamp != -1) { + // Metric is within desired time range + TimelineClusterMetric clusterMetric = new TimelineClusterMetric( + timelineMetric.getMetricName(), + timelineMetric.getAppId(), + timelineMetric.getInstanceId(), + timestamp, + timelineMetric.getType()); + if (!timelineClusterMetricMap.containsKey(clusterMetric)) { + timelineClusterMetricMap.put(clusterMetric, metric.getValue()); + } else { + Double oldValue = timelineClusterMetricMap.get(clusterMetric); + Double newValue = (oldValue + metric.getValue()) / 2; + timelineClusterMetricMap.put(clusterMetric, newValue); + } + } + } + + return timelineClusterMetricMap; + } + + /** + * Return beginning of the time slice into which the metric fits. + */ + private Long getSliceTimeForMetric(List<Long[]> timeSlices, Long timestamp) { + for (Long[] timeSlice : timeSlices) { + if (timestamp >= timeSlice[0] && timestamp < timeSlice[1]) { + return timeSlice[0]; + } + } + return -1l; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java new file mode 100644 index 0000000..18e5f18 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; +import java.io.IOException; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL; + +public class TimelineMetricHostAggregator extends AbstractTimelineAggregator { + private static final Log LOG = LogFactory.getLog(TimelineMetricHostAggregator.class); + + public TimelineMetricHostAggregator(PhoenixHBaseAccessor hBaseAccessor, + Configuration metricsConf, + String checkpointLocation, + Long sleepIntervalMillis, + Integer checkpointCutOffMultiplier, + String hostAggregatorDisabledParam, + String tableName, + String outputTableName, + Long nativeTimeRangeDelay) { + super(hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, + checkpointCutOffMultiplier, hostAggregatorDisabledParam, tableName, + outputTableName, nativeTimeRangeDelay); + } + + @Override + protected void aggregate(ResultSet rs, long startTime, long endTime) throws IOException, SQLException { + + Map<TimelineMetric, MetricHostAggregate> hostAggregateMap = aggregateMetricsFromResultSet(rs); + + LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates."); + hBaseAccessor.saveHostAggregateRecords(hostAggregateMap, outputTableName); + } + + @Override + protected Condition prepareMetricQueryCondition(long startTime, long endTime) { + Condition condition = new DefaultCondition(null, null, null, null, startTime, + endTime, null, null, true); + condition.setNoLimit(); + condition.setFetchSize(resultsetFetchSize); + condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL, + PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, nativeTimeRangeDelay), + tableName)); + condition.addOrderByColumn("METRIC_NAME"); + condition.addOrderByColumn("HOSTNAME"); + condition.addOrderByColumn("APP_ID"); + condition.addOrderByColumn("INSTANCE_ID"); + condition.addOrderByColumn("SERVER_TIME"); + return condition; + } + + private Map<TimelineMetric, MetricHostAggregate> aggregateMetricsFromResultSet(ResultSet rs) + throws IOException, SQLException { + TimelineMetric existingMetric = null; + MetricHostAggregate hostAggregate = null; + Map<TimelineMetric, MetricHostAggregate> hostAggregateMap = + new HashMap<TimelineMetric, MetricHostAggregate>(); + + while (rs.next()) { + TimelineMetric currentMetric = + PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs); + MetricHostAggregate currentHostAggregate = + PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs); + + if (existingMetric == null) { + // First row + existingMetric = currentMetric; + hostAggregate = new MetricHostAggregate(); + hostAggregateMap.put(currentMetric, hostAggregate); + } + + if (existingMetric.equalsExceptTime(currentMetric)) { + // Recalculate totals with current metric + hostAggregate.updateAggregates(currentHostAggregate); + } else { + // Switched over to a new metric - save existing - create new aggregate + hostAggregate = new MetricHostAggregate(); + hostAggregate.updateAggregates(currentHostAggregate); + hostAggregateMap.put(currentMetric, hostAggregate); + existingMetric = currentMetric; + } + } + return hostAggregateMap; + } + + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java index 40a9648..25f8c62 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java @@ -62,5 +62,41 @@ public class TimelineMetricReadHelper { return metric; } + public MetricClusterAggregate getMetricClusterAggregateFromResultSet(ResultSet rs) + throws SQLException { + MetricClusterAggregate agg = new MetricClusterAggregate(); + agg.setSum(rs.getDouble("METRIC_SUM")); + agg.setMax(rs.getDouble("METRIC_MAX")); + agg.setMin(rs.getDouble("METRIC_MIN")); + agg.setNumberOfHosts(rs.getInt("HOSTS_COUNT")); + + agg.setDeviation(0.0); + + return agg; + } + + public MetricClusterAggregate getMetricClusterTimeAggregateFromResultSet(ResultSet rs) + throws SQLException { + MetricClusterAggregate agg = new MetricClusterAggregate(); + agg.setSum(rs.getDouble("METRIC_SUM")); + agg.setMax(rs.getDouble("METRIC_MAX")); + agg.setMin(rs.getDouble("METRIC_MIN")); + agg.setNumberOfHosts(rs.getInt("METRIC_COUNT")); + + agg.setDeviation(0.0); + + return agg; + } + + + public TimelineClusterMetric fromResultSet(ResultSet rs) throws SQLException { + return new TimelineClusterMetric( + rs.getString("METRIC_NAME"), + rs.getString("APP_ID"), + ignoreInstance ? null : rs.getString("INSTANCE_ID"), + rs.getLong("SERVER_TIME"), + rs.getString("UNITS")); + } + } http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java index 636999f..a630e77 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java @@ -58,24 +58,8 @@ public class PhoenixTransactSQL { "INSTANCE_ID)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " + "TTL=%s, COMPRESSION='%s'"; - public static final String CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL = - "CREATE TABLE IF NOT EXISTS METRIC_RECORD_HOURLY " + - "(METRIC_NAME VARCHAR, " + - "HOSTNAME VARCHAR, " + - "APP_ID VARCHAR, " + - "INSTANCE_ID VARCHAR, " + - "SERVER_TIME UNSIGNED_LONG NOT NULL, " + - "UNITS CHAR(20), " + - "METRIC_SUM DOUBLE," + - "METRIC_COUNT UNSIGNED_INT, " + - "METRIC_MAX DOUBLE," + - "METRIC_MIN DOUBLE CONSTRAINT pk " + - "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " + - "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " + - "TTL=%s, COMPRESSION='%s'"; - - public static final String CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL = - "CREATE TABLE IF NOT EXISTS METRIC_RECORD_MINUTE " + + public static final String CREATE_METRICS_AGGREGATE_TABLE_SQL = + "CREATE TABLE IF NOT EXISTS %s " + "(METRIC_NAME VARCHAR, " + "HOSTNAME VARCHAR, " + "APP_ID VARCHAR, " + @@ -91,7 +75,7 @@ public class PhoenixTransactSQL { " COMPRESSION='%s'"; public static final String CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL = - "CREATE TABLE IF NOT EXISTS METRIC_AGGREGATE " + + "CREATE TABLE IF NOT EXISTS %s " + "(METRIC_NAME VARCHAR, " + "APP_ID VARCHAR, " + "INSTANCE_ID VARCHAR, " + @@ -105,8 +89,9 @@ public class PhoenixTransactSQL { "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " + "TTL=%s, COMPRESSION='%s'"; + // HOSTS_COUNT vs METRIC_COUNT public static final String CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL = - "CREATE TABLE IF NOT EXISTS METRIC_AGGREGATE_HOURLY " + + "CREATE TABLE IF NOT EXISTS %s " + "(METRIC_NAME VARCHAR, " + "APP_ID VARCHAR, " + "INSTANCE_ID VARCHAR, " + @@ -139,7 +124,7 @@ public class PhoenixTransactSQL { "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; public static final String UPSERT_CLUSTER_AGGREGATE_SQL = "UPSERT INTO " + - "METRIC_AGGREGATE (METRIC_NAME, APP_ID, INSTANCE_ID, SERVER_TIME, " + + "%s (METRIC_NAME, APP_ID, INSTANCE_ID, SERVER_TIME, " + "UNITS, " + "METRIC_SUM, " + "HOSTS_COUNT, " + @@ -156,7 +141,6 @@ public class PhoenixTransactSQL { "METRIC_MIN) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; - public static final String UPSERT_AGGREGATE_RECORD_SQL = "UPSERT INTO " + "%s (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " + "SERVER_TIME, " + @@ -198,25 +182,29 @@ public class PhoenixTransactSQL { "METRIC_MIN " + "FROM %s"; - public static final String GET_CLUSTER_AGGREGATE_HOURLY_SQL = "SELECT %s " + - "METRIC_NAME, APP_ID, " + - "INSTANCE_ID, SERVER_TIME, " + - "UNITS, " + - "METRIC_SUM, " + - "METRIC_COUNT, " + - "METRIC_MAX, " + - "METRIC_MIN " + - "FROM %s"; + public static final String GET_CLUSTER_AGGREGATE_TIME_SQL = "SELECT %s " + + "METRIC_NAME, APP_ID, " + + "INSTANCE_ID, SERVER_TIME, " + + "UNITS, " + + "METRIC_SUM, " + + "METRIC_COUNT, " + + "METRIC_MAX, " + + "METRIC_MIN " + + "FROM %s"; public static final String METRICS_RECORD_TABLE_NAME = "METRIC_RECORD"; public static final String METRICS_AGGREGATE_MINUTE_TABLE_NAME = "METRIC_RECORD_MINUTE"; public static final String METRICS_AGGREGATE_HOURLY_TABLE_NAME = "METRIC_RECORD_HOURLY"; + public static final String METRICS_AGGREGATE_DAILY_TABLE_NAME = + "METRIC_RECORD_DAILY"; public static final String METRICS_CLUSTER_AGGREGATE_TABLE_NAME = "METRIC_AGGREGATE"; public static final String METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME = "METRIC_AGGREGATE_HOURLY"; + public static final String METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME = + "METRIC_AGGREGATE_DAILY"; public static final String DEFAULT_TABLE_COMPRESSION = "SNAPPY"; public static final String DEFAULT_ENCODING = "FAST_DIFF"; public static final long NATIVE_TIME_RANGE_DELTA = 120000; // 2 minutes @@ -250,7 +238,11 @@ public class PhoenixTransactSQL { long endTime = condition.getEndTime() == null ? System.currentTimeMillis() : condition.getEndTime(); long startTime = condition.getStartTime() == null ? 0 : condition.getStartTime(); Long timeRange = endTime - startTime; - if (timeRange > 5 * DAY) { + if (timeRange > 7 * DAY) { + metricsTable = METRICS_AGGREGATE_DAILY_TABLE_NAME; + query = GET_METRIC_AGGREGATE_ONLY_SQL; + condition.setPrecision(Precision.DAYS); + } else if (timeRange < 7 * DAY && timeRange > DAY) { metricsTable = METRICS_AGGREGATE_HOURLY_TABLE_NAME; query = GET_METRIC_AGGREGATE_ONLY_SQL; condition.setPrecision(Precision.HOURS); @@ -265,6 +257,10 @@ public class PhoenixTransactSQL { } } else { switch (condition.getPrecision()) { + case DAYS: + metricsTable = METRICS_AGGREGATE_DAILY_TABLE_NAME; + query = GET_METRIC_AGGREGATE_ONLY_SQL; + break; case HOURS: metricsTable = METRICS_AGGREGATE_HOURLY_TABLE_NAME; query = GET_METRIC_AGGREGATE_ONLY_SQL; @@ -462,9 +458,13 @@ public class PhoenixTransactSQL { long endTime = condition.getEndTime() == null ? System.currentTimeMillis() : condition.getEndTime(); long startTime = condition.getStartTime() == null ? 0 : condition.getStartTime(); Long timeRange = endTime - startTime; - if (timeRange > 5 * DAY) { + if (timeRange > 7 * DAY) { + metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME; + queryStmt = GET_CLUSTER_AGGREGATE_TIME_SQL; + condition.setPrecision(Precision.DAYS); + } else if (timeRange < 7 * DAY && timeRange > DAY) { metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME; - queryStmt = GET_CLUSTER_AGGREGATE_HOURLY_SQL; + queryStmt = GET_CLUSTER_AGGREGATE_TIME_SQL; condition.setPrecision(Precision.HOURS); } else { metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_TABLE_NAME; @@ -473,9 +473,13 @@ public class PhoenixTransactSQL { } } else { switch (condition.getPrecision()) { + case DAYS: + metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME; + queryStmt = GET_CLUSTER_AGGREGATE_TIME_SQL; + break; case HOURS: metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME; - queryStmt = GET_CLUSTER_AGGREGATE_HOURLY_SQL; + queryStmt = GET_CLUSTER_AGGREGATE_TIME_SQL; break; default: metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_TABLE_NAME; @@ -483,6 +487,10 @@ public class PhoenixTransactSQL { } } + queryStmt = String.format(queryStmt, + getNaiveTimeRangeHint(condition.getStartTime(), NATIVE_TIME_RANGE_DELTA), + metricsAggregateTable); + StringBuilder sb = new StringBuilder(queryStmt); sb.append(" WHERE "); sb.append(condition.getConditionClause()); @@ -491,9 +499,7 @@ public class PhoenixTransactSQL { sb.append(" LIMIT ").append(condition.getLimit()); } - String query = String.format(sb.toString(), - PhoenixTransactSQL.getNaiveTimeRangeHint(condition.getStartTime(), - NATIVE_TIME_RANGE_DELTA), metricsAggregateTable); + String query = sb.toString(); if (LOG.isDebugEnabled()) { LOG.debug("SQL => " + query + ", condition => " + condition); } http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java index 90c03e4..6cfaa2e 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.IntegrationTestingUtility; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.ConnectionProvider; import org.apache.phoenix.hbase.index.write.IndexWriterUtils; import org.apache.phoenix.query.BaseTest; @@ -49,11 +50,11 @@ public abstract class AbstractMiniHBaseClusterTest extends BaseTest { @BeforeClass public static void doSetup() throws Exception { Map<String, String> props = getDefaultProps(); + props.put(IntegrationTestingUtility.IS_DISTRIBUTED_CLUSTER, "false"); props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(5000)); props.put(IndexWriterUtils.HTABLE_THREAD_KEY, Integer.toString(100)); // Make a small batch size to test multiple calls to reserve sequences - props.put(QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB, - Long.toString(BATCH_SIZE)); + props.put(QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB, Long.toString(BATCH_SIZE)); // Must update config before starting server setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java index 27e9d67..aa276f3 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java @@ -20,17 +20,18 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetricReader; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricClusterAggregator; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricClusterAggregatorHourly; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricReadHelper; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; + import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -39,6 +40,7 @@ import java.sql.Statement; import java.util.Collections; import java.util.HashMap; import java.util.Map; + import static junit.framework.Assert.assertEquals; import static junit.framework.Assert.assertNotNull; import static junit.framework.Assert.fail; @@ -46,14 +48,14 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.prepareSingleTimelineMetric; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_APP_IDS; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA; public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { private Connection conn; private PhoenixHBaseAccessor hdb; - private final TimelineClusterMetricReader metricReader = new - TimelineClusterMetricReader(false); + private final TimelineMetricReadHelper metricReader = new TimelineMetricReadHelper(false); @Before public void setUp() throws Exception { @@ -83,8 +85,9 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { @Test public void testShouldAggregateClusterProperly() throws Exception { // GIVEN - TimelineMetricClusterAggregator agg = - new TimelineMetricClusterAggregator(hdb, new Configuration()); + TimelineMetricAggregator agg = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, new Configuration()); + TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); long startTime = System.currentTimeMillis(); long ctime = startTime; @@ -118,7 +121,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { while (rs.next()) { TimelineClusterMetric currentMetric = metricReader.fromResultSet(rs); MetricClusterAggregate currentHostAggregate = - PhoenixHBaseAccessor.getMetricClusterAggregateFromResultSet(rs); + readHelper.getMetricClusterAggregateFromResultSet(rs); if ("disk_free".equals(currentMetric.getMetricName())) { assertEquals(2, currentHostAggregate.getNumberOfHosts()); @@ -136,8 +139,9 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { @Test public void testShouldAggregateClusterIgnoringInstance() throws Exception { // GIVEN - TimelineMetricClusterAggregator agg = - new TimelineMetricClusterAggregator(hdb, new Configuration()); + TimelineMetricAggregator agg = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, new Configuration()); + TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); long startTime = System.currentTimeMillis(); long ctime = startTime; @@ -188,7 +192,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { TimelineClusterMetric currentMetric = metricReader.fromResultSet(rs); // PhoenixHBaseAccessor.getTimelineMetricClusterKeyFromResultSet(rs); MetricClusterAggregate currentHostAggregate = - PhoenixHBaseAccessor.getMetricClusterAggregateFromResultSet(rs); + readHelper.getMetricClusterAggregateFromResultSet(rs); if ("disk_free".equals(currentMetric.getMetricName())) { System.out.println("OUTPUT: " + currentMetric+" - " + @@ -205,11 +209,11 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { } @Test - public void testShouldAggregateDifferentMetricsOnClusterProperly() - throws Exception { + public void testShouldAggregateDifferentMetricsOnClusterProperly() throws Exception { // GIVEN - TimelineMetricClusterAggregator agg = - new TimelineMetricClusterAggregator(hdb, new Configuration()); + TimelineMetricAggregator agg = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, new Configuration()); + TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); // here we put some metrics tha will be aggregated long startTime = System.currentTimeMillis(); @@ -249,7 +253,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { while (rs.next()) { TimelineClusterMetric currentMetric = metricReader.fromResultSet(rs); MetricClusterAggregate currentHostAggregate = - PhoenixHBaseAccessor.getMetricClusterAggregateFromResultSet(rs); + readHelper.getMetricClusterAggregateFromResultSet(rs); if ("disk_free".equals(currentMetric.getMetricName())) { assertEquals(2, currentHostAggregate.getNumberOfHosts()); @@ -269,12 +273,56 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { } } + @Test + public void testAggregateDailyClusterMetrics() throws Exception { + // GIVEN + TimelineMetricAggregator agg = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(hdb, new Configuration()); + + // this time can be virtualized! or made independent from real clock + long startTime = System.currentTimeMillis(); + long ctime = startTime; + long hour = 3600 * 1000; + + Map<TimelineClusterMetric, MetricHostAggregate> records = + new HashMap<TimelineClusterMetric, MetricHostAggregate>(); + + records.put(createEmptyTimelineClusterMetric(ctime), + MetricTestHelper.createMetricHostAggregate(4.0, 0.0, 2, 4.0)); + records.put(createEmptyTimelineClusterMetric(ctime += hour), + MetricTestHelper.createMetricHostAggregate(4.0, 0.0, 2, 4.0)); + records.put(createEmptyTimelineClusterMetric(ctime += hour), + MetricTestHelper.createMetricHostAggregate(4.0, 0.0, 2, 4.0)); + records.put(createEmptyTimelineClusterMetric(ctime += hour), + MetricTestHelper.createMetricHostAggregate(4.0, 0.0, 2, 4.0)); + + + hdb.saveClusterTimeAggregateRecords(records, METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME); + + // WHEN + agg.doWork(startTime, ctime + hour + 1000); + + // THEN + ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_DAILY"); + int count = 0; + while (rs.next()) { + assertEquals("METRIC_NAME", "disk_used", rs.getString("METRIC_NAME")); + assertEquals("APP_ID", "test_app", rs.getString("APP_ID")); + assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM")); + assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT")); + assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX")); + assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN")); + count++; + } + + assertEquals("Day aggregated row expected ", 1, count); + } @Test public void testShouldAggregateClusterOnHourProperly() throws Exception { // GIVEN - TimelineMetricClusterAggregatorHourly agg = - new TimelineMetricClusterAggregatorHourly(hdb, new Configuration()); + TimelineMetricAggregator agg = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, new Configuration()); // this time can be virtualized! or made independent from real clock long startTime = System.currentTimeMillis(); @@ -315,11 +363,10 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { } @Test - public void testShouldAggregateDifferentMetricsOnHourProperly() throws - Exception { + public void testShouldAggregateDifferentMetricsOnHourProperly() throws Exception { // GIVEN - TimelineMetricClusterAggregatorHourly agg = - new TimelineMetricClusterAggregatorHourly(hdb, new Configuration()); + TimelineMetricAggregator agg = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, new Configuration()); long startTime = System.currentTimeMillis(); long ctime = startTime; @@ -381,7 +428,9 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { public void testAppLevelHostMetricAggregates() throws Exception { Configuration conf = new Configuration(); conf.set(CLUSTER_AGGREGATOR_APP_IDS, "app1"); - TimelineMetricClusterAggregator agg = new TimelineMetricClusterAggregator(hdb, conf); + TimelineMetricAggregator agg = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, conf); + TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); long startTime = System.currentTimeMillis(); long ctime = startTime; @@ -416,7 +465,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { MetricClusterAggregate currentHostAggregate = null; while (rs.next()) { currentMetric = metricReader.fromResultSet(rs); - currentHostAggregate = PhoenixHBaseAccessor.getMetricClusterAggregateFromResultSet(rs); + currentHostAggregate = readHelper.getMetricClusterAggregateFromResultSet(rs); recordCount++; } assertEquals(4, recordCount); http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java index 66420d9..f4f2223 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline. import org.junit.After; import org.junit.Before; import org.junit.Test; + import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -37,15 +38,17 @@ import java.util.Arrays; import java.util.Comparator; import java.util.HashMap; import java.util.Map; + import static junit.framework.Assert.assertEquals; import static junit.framework.Assert.assertTrue; import static junit.framework.Assert.fail; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineMetric; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA; import static org.assertj.core.api.Assertions.assertThat; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineMetric; public class ITMetricAggregator extends AbstractMiniHBaseClusterTest { private Connection conn; @@ -102,10 +105,8 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest { @Test public void testShouldAggregateMinuteProperly() throws Exception { // GIVEN -// TimelineMetricAggregatorMinute aggregatorMinute = -// new TimelineMetricAggregatorMinute(hdb, new Configuration()); - TimelineMetricAggregator aggregatorMinute = TimelineMetricAggregatorFactory - .createTimelineMetricAggregatorMinute(hdb, new Configuration()); + TimelineMetricAggregator aggregatorMinute = + TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hdb, new Configuration()); long startTime = System.currentTimeMillis(); long ctime = startTime; @@ -162,17 +163,14 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest { } @Test - public void testShouldAggregateHourProperly() throws Exception { + public void testShouldAggregateHourProperly() throws Exception { // GIVEN -// TimelineMetricAggregatorHourly aggregator = -// new TimelineMetricAggregatorHourly(hdb, new Configuration()); - - TimelineMetricAggregator aggregator = TimelineMetricAggregatorFactory - .createTimelineMetricAggregatorHourly(hdb, new Configuration()); + TimelineMetricAggregator aggregator = + TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hdb, new Configuration()); long startTime = System.currentTimeMillis(); MetricHostAggregate expectedAggregate = - MetricTestHelper.createMetricHostAggregate(2.0, 0.0, 20, 15.0); + MetricTestHelper.createMetricHostAggregate(2.0, 0.0, 20, 15.0); Map<TimelineMetric, MetricHostAggregate> aggMap = new HashMap<TimelineMetric, MetricHostAggregate>(); @@ -226,6 +224,66 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest { } } + @Test + public void testMetricAggregateDaily() throws Exception { + // GIVEN + TimelineMetricAggregator aggregator = + TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(hdb, new Configuration()); + long startTime = System.currentTimeMillis(); + + MetricHostAggregate expectedAggregate = + MetricTestHelper.createMetricHostAggregate(2.0, 0.0, 20, 15.0); + Map<TimelineMetric, MetricHostAggregate> + aggMap = new HashMap<TimelineMetric, MetricHostAggregate>(); + + int min_5 = 5 * 60 * 1000; + long ctime = startTime - min_5; + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + + hdb.saveHostAggregateRecords(aggMap, METRICS_AGGREGATE_HOURLY_TABLE_NAME); + + //WHEN + long endTime = ctime + min_5; + boolean success = aggregator.doWork(startTime, endTime); + assertTrue(success); + + //THEN + Condition condition = new DefaultCondition(null, null, null, null, startTime, + endTime, null, null, true); + condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL, + PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA), + METRICS_AGGREGATE_DAILY_TABLE_NAME)); + + PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); + ResultSet rs = pstmt.executeQuery(); + + while (rs.next()) { + TimelineMetric currentMetric = + PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs); + MetricHostAggregate currentHostAggregate = + PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs); + + if ("disk_used".equals(currentMetric.getMetricName())) { + assertEquals(2.0, currentHostAggregate.getMax()); + assertEquals(0.0, currentHostAggregate.getMin()); + assertEquals(12 * 20, currentHostAggregate.getNumberOfSamples()); + assertEquals(12 * 15.0, currentHostAggregate.getSum()); + assertEquals(15.0 / 20, currentHostAggregate.getAvg()); + } + } + } + private final static Comparator<TimelineMetric> TIME_IGNORING_COMPARATOR = new Comparator<TimelineMetric>() { @Override http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java index 11c4d0f..4c56f77 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java @@ -26,15 +26,11 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline. import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricClusterAggregator; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricClusterAggregatorHourly; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; import org.junit.After; import org.junit.Before; import org.junit.Test; - import java.io.IOException; import java.sql.Connection; import java.sql.SQLException; @@ -44,14 +40,14 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; - import static junit.framework.Assert.assertEquals; import static junit.framework.Assert.assertTrue; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.prepareSingleTimelineMetric; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineClusterMetric; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineMetric; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createMetricHostAggregate; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineClusterMetric; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.prepareSingleTimelineMetric; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME; public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest { @@ -119,8 +115,8 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest { @Test public void testGetMetricRecordsMinutes() throws IOException, SQLException { // GIVEN - TimelineMetricAggregator aggregatorMinute = TimelineMetricAggregatorFactory - .createTimelineMetricAggregatorMinute(hdb, new Configuration()); + TimelineMetricAggregator aggregatorMinute = + TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hdb, new Configuration()); long startTime = System.currentTimeMillis(); long ctime = startTime; @@ -156,8 +152,8 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest { @Test public void testGetMetricRecordsHours() throws IOException, SQLException { // GIVEN - TimelineMetricAggregator aggregator = TimelineMetricAggregatorFactory - .createTimelineMetricAggregatorHourly(hdb, new Configuration()); + TimelineMetricAggregator aggregator = + TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hdb, new Configuration()); MetricHostAggregate expectedAggregate = createMetricHostAggregate(2.0, 0.0, 20, 15.0); @@ -207,8 +203,8 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest { @Test public void testGetClusterMetricRecordsSeconds() throws Exception { // GIVEN - TimelineMetricClusterAggregator agg = - new TimelineMetricClusterAggregator(hdb, new Configuration()); + TimelineMetricAggregator agg = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, new Configuration()); long startTime = System.currentTimeMillis(); long ctime = startTime + 1; @@ -246,8 +242,8 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest { @Test public void testGetClusterMetricRecordsHours() throws Exception { // GIVEN - TimelineMetricClusterAggregatorHourly agg = - new TimelineMetricClusterAggregatorHourly(hdb, new Configuration()); + TimelineMetricAggregator agg = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, new Configuration()); long startTime = System.currentTimeMillis(); long ctime = startTime; @@ -286,9 +282,7 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest { assertEquals(2.0, metric.getMetricValues().values().iterator().next(), 0.00001); } - private Map<String, List<Function>> singletonValueFunctionMap(String - metricName) { - return Collections.singletonMap(metricName, Collections.singletonList - (new Function())); + private Map<String, List<Function>> singletonValueFunctionMap(String metricName) { + return Collections.singletonMap(metricName, Collections.singletonList(new Function())); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricTestHelper.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricTestHelper.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricTestHelper.java index bdcd6df..26771d7 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricTestHelper.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricTestHelper.java @@ -28,11 +28,8 @@ import java.util.Map; public class MetricTestHelper { - public static MetricHostAggregate - createMetricHostAggregate(double max, double min, int numberOfSamples, - double sum) { - MetricHostAggregate expectedAggregate = - new MetricHostAggregate(); + public static MetricHostAggregate createMetricHostAggregate(double max, double min, int numberOfSamples, double sum) { + MetricHostAggregate expectedAggregate = new MetricHostAggregate(); expectedAggregate.setMax(max); expectedAggregate.setMin(min); expectedAggregate.setNumberOfSamples(numberOfSamples); http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java index d6c1814..44f48e8 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java @@ -53,8 +53,8 @@ public class TestMetricHostAggregate { assertThat(aggregate.getAvg()).isEqualTo((3.0 + 8.0 + 1.0) / 5); } - private MetricHostAggregate createAggregate - (double sum, double min, double max, int samplesCount) { + static MetricHostAggregate createAggregate (Double sum, Double min, + Double max, Integer samplesCount) { MetricHostAggregate aggregate = new MetricHostAggregate(); aggregate.setSum(sum); aggregate.setMax(max); http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml index 74c4304..fc52e5a 100644 --- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml +++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml @@ -53,6 +53,14 @@ </description> </property> <property> + <name>timeline.metrics.host.aggregator.minute.interval</name> + <value>120</value> + <description> + Time in seconds to sleep for the minute resolution host based + aggregator. Default resolution is 2 minutes. + </description> + </property> + <property> <name>timeline.metrics.host.aggregator.hourly.interval</name> <value>3600</value> <description> @@ -61,11 +69,11 @@ </description> </property> <property> - <name>timeline.metrics.host.aggregator.minute.interval</name> - <value>120</value> + <name>timeline.metrics.daily.aggregator.minute.interval</name> + <value>86400</value> <description> - Time in seconds to sleep for the minute resolution host based - aggregator. Default resolution is 2 minutes. + Time in seconds to sleep for the day resolution host based + aggregator. Default resolution is 24 hours. </description> </property> <property> @@ -77,6 +85,14 @@ </description> </property> <property> + <name>timeline.metrics.cluster.aggregator.daily.interval</name> + <value>86400</value> + <description> + Time in seconds to sleep for the day resolution cluster wide + aggregator. Default is 24 hours. + </description> + </property> + <property> <name>timeline.metrics.cluster.aggregator.minute.interval</name> <value>120</value> <description> @@ -85,6 +101,15 @@ </description> </property> <property> + <name>timeline.metrics.host.aggregator.daily.checkpointCutOffMultiplier</name> + <value>1</value> + <description> + Multiplier value * interval = Max allowed checkpoint lag. Effectively + if aggregator checkpoint is greater than max allowed checkpoint delay, + the checkpoint will be discarded by the aggregator. + </description> + </property> + <property> <name>timeline.metrics.host.aggregator.hourly.checkpointCutOffMultiplier</name> <value>2</value> <description> @@ -121,6 +146,22 @@ </description> </property> <property> + <name>timeline.metrics.cluster.aggregator.daily.checkpointCutOffMultiplier</name> + <value>1</value> + <description> + Multiplier value * interval = Max allowed checkpoint lag. Effectively + if aggregator checkpoint is greater than max allowed checkpoint delay, + the checkpoint will be discarded by the aggregator. + </description> + </property> + <property> + <name>timeline.metrics.host.aggregator.daily.disabled</name> + <value>false</value> + <description> + Disable host based daily aggregations. + </description> + </property> + <property> <name>timeline.metrics.host.aggregator.hourly.disabled</name> <value>false</value> <description> @@ -135,6 +176,13 @@ </description> </property> <property> + <name>timeline.metrics.cluster.aggregator.daily.disabled</name> + <value>false</value> + <description> + Disable cluster based daily aggregations. + </description> + </property> + <property> <name>timeline.metrics.cluster.aggregator.hourly.disabled</name> <value>false</value> <description> @@ -156,6 +204,13 @@ </description> </property> <property> + <name>timeline.metrics.host.aggregator.daily.ttl</name> + <value>31536000</value> + <description> + Host based daily resolution data purge interval. Default is 1 year. + </description> + </property> + <property> <name>timeline.metrics.host.aggregator.hourly.ttl</name> <value>2592000</value> <description> @@ -184,6 +239,13 @@ </description> </property> <property> + <name>timeline.metrics.cluster.aggregator.daily.ttl</name> + <value>63072000</value> + <description> + Cluster wide daily resolution data purge interval. Default is 2 years. + </description> + </property> + <property> <name>timeline.metrics.host.aggregator.ttl</name> <value>86400</value> <description>