AMBARI-10622. Add daily aggregation to AMS (useful for reporting over months of data). (swagle)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/67c425ac Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/67c425ac Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/67c425ac Branch: refs/heads/trunk Commit: 67c425acfd22dcd701d05eab16f22d423db045dd Parents: 2557d9a Author: Siddharth Wagle <swa...@hortonworks.com> Authored: Tue Apr 21 09:51:58 2015 -0700 Committer: Siddharth Wagle <swa...@hortonworks.com> Committed: Tue Apr 21 09:51:58 2015 -0700 ---------------------------------------------------------------------- .../ambari-metrics-timelineservice/pom.xml | 1 + .../timeline/HBaseTimelineMetricStore.java | 40 ++-- .../metrics/timeline/PhoenixHBaseAccessor.java | 144 ++++++------ .../metrics/timeline/Precision.java | 6 +- .../timeline/TimelineMetricConfiguration.java | 29 ++- .../aggregators/AbstractTimelineAggregator.java | 58 ++++- .../TimelineClusterMetricReader.java | 42 ---- .../aggregators/TimelineMetricAggregator.java | 151 ++---------- .../TimelineMetricAggregatorFactory.java | 188 ++++++++++++++- .../TimelineMetricClusterAggregator.java | 235 ++++++------------- .../TimelineMetricClusterAggregatorHourly.java | 175 -------------- .../TimelineMetricClusterAggregatorMinute.java | 201 ++++++++++++++++ .../TimelineMetricHostAggregator.java | 113 +++++++++ .../aggregators/TimelineMetricReadHelper.java | 36 +++ .../timeline/query/PhoenixTransactSQL.java | 82 ++++--- .../timeline/AbstractMiniHBaseClusterTest.java | 5 +- .../metrics/timeline/ITClusterAggregator.java | 99 ++++++-- .../metrics/timeline/ITMetricAggregator.java | 82 ++++++- .../timeline/ITPhoenixHBaseAccessor.java | 34 ++- .../metrics/timeline/MetricTestHelper.java | 7 +- .../timeline/TestMetricHostAggregate.java | 4 +- .../0.1.0/configuration/ams-site.xml | 70 +++++- 22 files changed, 1065 insertions(+), 737 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/pom.xml b/ambari-metrics/ambari-metrics-timelineservice/pom.xml index 4ec730e..2485661 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/pom.xml +++ b/ambari-metrics/ambari-metrics-timelineservice/pom.xml @@ -249,6 +249,7 @@ <configuration> <redirectTestOutputToFile>true</redirectTestOutputToFile> <forkMode>always</forkMode> + <argLine>-XX:-UseSplitVerifier</argLine> </configuration> </plugin> </plugins> http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java index 1fac404..447f6f9 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java @@ -28,8 +28,6 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricClusterAggregator; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricClusterAggregatorHourly; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition; @@ -69,39 +67,53 @@ public class HBaseTimelineMetricStore extends AbstractService hBaseAccessor = new PhoenixHBaseAccessor(hbaseConf, metricsConf); hBaseAccessor.initMetricSchema(); - // Start the cluster aggregator - TimelineMetricClusterAggregator minuteClusterAggregator = - new TimelineMetricClusterAggregator(hBaseAccessor, metricsConf); + // Start the cluster aggregator minute + TimelineMetricAggregator minuteClusterAggregator = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hBaseAccessor, metricsConf); if (!minuteClusterAggregator.isDisabled()) { Thread aggregatorThread = new Thread(minuteClusterAggregator); aggregatorThread.start(); } - // Start the cluster aggregator hourly - TimelineMetricClusterAggregatorHourly hourlyClusterAggregator = - new TimelineMetricClusterAggregatorHourly(hBaseAccessor, metricsConf); + // Start the hourly cluster aggregator + TimelineMetricAggregator hourlyClusterAggregator = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hBaseAccessor, metricsConf); if (!hourlyClusterAggregator.isDisabled()) { Thread aggregatorThread = new Thread(hourlyClusterAggregator); aggregatorThread.start(); } - // Start the 5 minute aggregator + // Start the daily cluster aggregator + TimelineMetricAggregator dailyClusterAggregator = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(hBaseAccessor, metricsConf); + if (!dailyClusterAggregator.isDisabled()) { + Thread aggregatorThread = new Thread(dailyClusterAggregator); + aggregatorThread.start(); + } + + // Start the minute host aggregator TimelineMetricAggregator minuteHostAggregator = - TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute - (hBaseAccessor, metricsConf); + TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hBaseAccessor, metricsConf); if (!minuteHostAggregator.isDisabled()) { Thread minuteAggregatorThread = new Thread(minuteHostAggregator); minuteAggregatorThread.start(); } - // Start hourly host aggregator + // Start the hourly host aggregator TimelineMetricAggregator hourlyHostAggregator = - TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly - (hBaseAccessor, metricsConf); + TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hBaseAccessor, metricsConf); if (!hourlyHostAggregator.isDisabled()) { Thread aggregatorHourlyThread = new Thread(hourlyHostAggregator); aggregatorHourlyThread.start(); } + + // Start the daily host aggregator + TimelineMetricAggregator dailyHostAggregator = + TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(hBaseAccessor, metricsConf); + if (!dailyHostAggregator.isDisabled()) { + Thread aggregatorDailyThread = new Thread(dailyHostAggregator); + aggregatorDailyThread.start(); + } } @Override http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java index 2e78912..e27d9a9 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.phoenix.exception.SQLExceptionCode; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.type.TypeReference; + import java.io.IOException; import java.sql.Connection; import java.sql.PreparedStatement; @@ -51,7 +52,9 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.TimeUnit; + import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_DAILY_TABLE_TTL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_HOUR_TABLE_TTL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_MINUTE_TABLE_TTL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_MAX_RETRIES; @@ -59,19 +62,21 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_RETRY_INTERVAL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_ENCODING_SCHEME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_DAILY_TABLE_TTL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_HOUR_TABLE_TTL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_MINUTE_TABLE_TTL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_TTL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.ALTER_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_TABLE_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.DEFAULT_ENCODING; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; @@ -85,13 +90,9 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti */ public class PhoenixHBaseAccessor { + static final int PHOENIX_MAX_MUTATION_STATE_SIZE = 50000; private static final Log LOG = LogFactory.getLog(PhoenixHBaseAccessor.class); private static final TimelineMetricReadHelper TIMELINE_METRIC_READ_HELPER = new TimelineMetricReadHelper(); - private final Configuration hbaseConf; - private final Configuration metricsConf; - private final RetryCounterFactory retryCounterFactory; - - static final int PHOENIX_MAX_MUTATION_STATE_SIZE = 50000; /** * 4 metrics/min * 60 * 24: Retrieve data for 1 day. */ @@ -99,9 +100,11 @@ public class PhoenixHBaseAccessor { public static int RESULTSET_LIMIT = (int)TimeUnit.DAYS.toMinutes(1) * METRICS_PER_MINUTE; private static ObjectMapper mapper = new ObjectMapper(); - private static TypeReference<Map<Long, Double>> metricValuesTypeRef = new TypeReference<Map<Long, Double>>() {}; + private final Configuration hbaseConf; + private final Configuration metricsConf; + private final RetryCounterFactory retryCounterFactory; private final ConnectionProvider dataSource; public PhoenixHBaseAccessor(Configuration hbaseConf, @@ -127,37 +130,6 @@ public class PhoenixHBaseAccessor { (int) SECONDS.toMillis(metricsConf.getInt(GLOBAL_RETRY_INTERVAL, 5))); } - - private Connection getConnectionRetryingOnException() - throws SQLException, InterruptedException { - RetryCounter retryCounter = retryCounterFactory.create(); - while (true) { - try{ - return getConnection(); - } catch (SQLException e) { - if(!retryCounter.shouldRetry()){ - LOG.error("HBaseAccessor getConnection failed after " - + retryCounter.getMaxAttempts() + " attempts"); - throw e; - } - } - retryCounter.sleepUntilNextRetry(); - } - } - - /** - * Get JDBC connection to HBase store. Assumption is that the hbase - * configuration is present on the classpath and loaded by the caller into - * the Configuration object. - * Phoenix already caches the HConnection between the client and HBase - * cluster. - * - * @return @java.sql.Connection - */ - public Connection getConnection() throws SQLException { - return dataSource.getConnection(); - } - private static TimelineMetric getLastTimelineMetricFromResultSet(ResultSet rs) throws SQLException, IOException { TimelineMetric metric = TIMELINE_METRIC_READ_HELPER @@ -218,8 +190,7 @@ public class PhoenixHBaseAccessor { } @SuppressWarnings("unchecked") - public static Map<Long, Double> readMetricFromJSON(String json) - throws IOException { + public static Map<Long, Double> readMetricFromJSON(String json) throws IOException { return (Map<Long, Double>) mapper.readValue(json, metricValuesTypeRef); } @@ -247,17 +218,34 @@ public class PhoenixHBaseAccessor { return metricHostAggregate; } - public static MetricClusterAggregate getMetricClusterAggregateFromResultSet(ResultSet rs) - throws SQLException { - MetricClusterAggregate agg = new MetricClusterAggregate(); - agg.setSum(rs.getDouble("METRIC_SUM")); - agg.setMax(rs.getDouble("METRIC_MAX")); - agg.setMin(rs.getDouble("METRIC_MIN")); - agg.setNumberOfHosts(rs.getInt("HOSTS_COUNT")); - - agg.setDeviation(0.0); + private Connection getConnectionRetryingOnException() + throws SQLException, InterruptedException { + RetryCounter retryCounter = retryCounterFactory.create(); + while (true) { + try{ + return getConnection(); + } catch (SQLException e) { + if(!retryCounter.shouldRetry()){ + LOG.error("HBaseAccessor getConnection failed after " + + retryCounter.getMaxAttempts() + " attempts"); + throw e; + } + } + retryCounter.sleepUntilNextRetry(); + } + } - return agg; + /** + * Get JDBC connection to HBase store. Assumption is that the hbase + * configuration is present on the classpath and loaded by the caller into + * the Configuration object. + * Phoenix already caches the HConnection between the client and HBase + * cluster. + * + * @return @java.sql.Connection + */ + public Connection getConnection() throws SQLException { + return dataSource.getConnection(); } protected void initMetricSchema() { @@ -269,24 +257,33 @@ public class PhoenixHBaseAccessor { String precisionTtl = metricsConf.get(PRECISION_TABLE_TTL, "86400"); String hostMinTtl = metricsConf.get(HOST_MINUTE_TABLE_TTL, "604800"); String hostHourTtl = metricsConf.get(HOST_HOUR_TABLE_TTL, "2592000"); + String hostDailyTtl = metricsConf.get(HOST_DAILY_TABLE_TTL, "31536000"); String clusterMinTtl = metricsConf.get(CLUSTER_MINUTE_TABLE_TTL, "2592000"); String clusterHourTtl = metricsConf.get(CLUSTER_HOUR_TABLE_TTL, "31536000"); + String clusterDailyTtl = metricsConf.get(CLUSTER_DAILY_TABLE_TTL, "31536000"); try { LOG.info("Initializing metrics schema..."); conn = getConnectionRetryingOnException(); stmt = conn.createStatement(); + // Host level stmt.executeUpdate(String.format(CREATE_METRICS_TABLE_SQL, encoding, precisionTtl, compression)); - stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL, - encoding, hostHourTtl, compression)); - stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL, - encoding, hostMinTtl, compression)); + stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_TABLE_SQL, + METRICS_AGGREGATE_MINUTE_TABLE_NAME, encoding, hostMinTtl, compression)); + stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_TABLE_SQL, + METRICS_AGGREGATE_HOURLY_TABLE_NAME, encoding, hostHourTtl, compression)); + stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_TABLE_SQL, + METRICS_AGGREGATE_DAILY_TABLE_NAME, encoding, hostDailyTtl, compression)); + + // Cluster level stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL, - encoding, clusterMinTtl, compression)); + METRICS_CLUSTER_AGGREGATE_TABLE_NAME, encoding, clusterMinTtl, compression)); + stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL, + METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME, encoding, clusterHourTtl, compression)); stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL, - encoding, clusterHourTtl, compression)); + METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME, encoding, clusterDailyTtl, compression)); //alter TTL options to update tables stmt.executeUpdate(String.format(ALTER_SQL, @@ -299,11 +296,17 @@ public class PhoenixHBaseAccessor { METRICS_AGGREGATE_HOURLY_TABLE_NAME, hostHourTtl)); stmt.executeUpdate(String.format(ALTER_SQL, + METRICS_AGGREGATE_DAILY_TABLE_NAME, + hostDailyTtl)); + stmt.executeUpdate(String.format(ALTER_SQL, METRICS_CLUSTER_AGGREGATE_TABLE_NAME, clusterMinTtl)); stmt.executeUpdate(String.format(ALTER_SQL, METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME, clusterHourTtl)); + stmt.executeUpdate(String.format(ALTER_SQL, + METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME, + clusterDailyTtl)); conn.commit(); } catch (SQLException sql) { @@ -726,9 +729,8 @@ public class PhoenixHBaseAccessor { } } - public void saveHostAggregateRecords(Map<TimelineMetric, - MetricHostAggregate> hostAggregateMap, String phoenixTableName) - throws SQLException { + public void saveHostAggregateRecords(Map<TimelineMetric, MetricHostAggregate> hostAggregateMap, + String phoenixTableName) throws SQLException { if (hostAggregateMap == null || hostAggregateMap.isEmpty()) { LOG.debug("Empty aggregate records."); @@ -809,9 +811,8 @@ public class PhoenixHBaseAccessor { * * @throws SQLException */ - public void saveClusterAggregateRecords( - Map<TimelineClusterMetric, MetricClusterAggregate> records) - throws SQLException { + public void saveClusterAggregateRecords(Map<TimelineClusterMetric, MetricClusterAggregate> records) + throws SQLException { if (records == null || records.isEmpty()) { LOG.debug("Empty aggregate records."); @@ -819,11 +820,11 @@ public class PhoenixHBaseAccessor { } long start = System.currentTimeMillis(); - + String sqlStr = String.format(UPSERT_CLUSTER_AGGREGATE_SQL, METRICS_CLUSTER_AGGREGATE_TABLE_NAME); Connection conn = getConnection(); PreparedStatement stmt = null; try { - stmt = conn.prepareStatement(UPSERT_CLUSTER_AGGREGATE_SQL); + stmt = conn.prepareStatement(sqlStr); int rowCount = 0; for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate> @@ -892,10 +893,8 @@ public class PhoenixHBaseAccessor { * * @throws SQLException */ - public void saveClusterAggregateHourlyRecords( - Map<TimelineClusterMetric, MetricHostAggregate> records, - String tableName) - throws SQLException { + public void saveClusterTimeAggregateRecords(Map<TimelineClusterMetric, MetricHostAggregate> records, + String tableName) throws SQLException { if (records == null || records.isEmpty()) { LOG.debug("Empty aggregate records."); return; @@ -906,12 +905,10 @@ public class PhoenixHBaseAccessor { Connection conn = getConnection(); PreparedStatement stmt = null; try { - stmt = conn.prepareStatement(String.format - (UPSERT_CLUSTER_AGGREGATE_TIME_SQL, tableName)); + stmt = conn.prepareStatement(String.format(UPSERT_CLUSTER_AGGREGATE_TIME_SQL, tableName)); int rowCount = 0; - for (Map.Entry<TimelineClusterMetric, MetricHostAggregate> - aggregateEntry : records.entrySet()) { + for (Map.Entry<TimelineClusterMetric, MetricHostAggregate> aggregateEntry : records.entrySet()) { TimelineClusterMetric clusterMetric = aggregateEntry.getKey(); MetricHostAggregate aggregate = aggregateEntry.getValue(); @@ -928,7 +925,6 @@ public class PhoenixHBaseAccessor { stmt.setLong(4, clusterMetric.getTimestamp()); stmt.setString(5, clusterMetric.getType()); stmt.setDouble(6, aggregate.getSum()); -// stmt.setInt(7, aggregate.getNumberOfHosts()); stmt.setLong(7, aggregate.getNumberOfSamples()); stmt.setDouble(8, aggregate.getMax()); stmt.setDouble(9, aggregate.getMin()); http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/Precision.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/Precision.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/Precision.java index c0e1ebc..ee0e87c 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/Precision.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/Precision.java @@ -26,7 +26,8 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline public enum Precision { SECONDS, MINUTES, - HOURS; + HOURS, + DAYS; public static class PrecisionFormatException extends IllegalArgumentException { public PrecisionFormatException(String message, Throwable cause) { @@ -41,7 +42,8 @@ public enum Precision { try { return Precision.valueOf(precision.toUpperCase()); } catch (IllegalArgumentException e) { - throw new PrecisionFormatException("precision should be seconds, minutes or hours", e); + throw new PrecisionFormatException("precision should be seconds, " + + "minutes, hours or days", e); } } } http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java index b72aa64..0595c20 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java @@ -54,15 +54,25 @@ public class TimelineMetricConfiguration { public static final String PRECISION_TABLE_TTL = "timeline.metrics.host.aggregator.ttl"; + public static final String HOST_MINUTE_TABLE_TTL = "timeline.metrics.host.aggregator.minute.ttl"; + + public static final String HOST_DAILY_TABLE_TTL = + "timeline.metrics.host.aggregator.daily.ttl"; + public static final String HOST_HOUR_TABLE_TTL = "timeline.metrics.host.aggregator.hourly.ttl"; + public static final String CLUSTER_MINUTE_TABLE_TTL = "timeline.metrics.cluster.aggregator.minute.ttl"; + public static final String CLUSTER_HOUR_TABLE_TTL = "timeline.metrics.cluster.aggregator.hourly.ttl"; + public static final String CLUSTER_DAILY_TABLE_TTL = + "timeline.metrics.cluster.aggregator.daily.ttl"; + public static final String CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL = "timeline.metrics.cluster.aggregator.minute.timeslice.interval"; @@ -78,26 +88,35 @@ public class TimelineMetricConfiguration { public static final String HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL = "timeline.metrics.host.aggregator.hourly.interval"; + public static final String HOST_AGGREGATOR_DAILY_SLEEP_INTERVAL = + "timeline.metrics.host.aggregator.daily.interval"; + public static final String CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL = "timeline.metrics.cluster.aggregator.minute.interval"; public static final String CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL = "timeline.metrics.cluster.aggregator.hourly.interval"; + public static final String CLUSTER_AGGREGATOR_DAILY_SLEEP_INTERVAL = + "timeline.metrics.cluster.aggregator.daily.interval"; + public static final String HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER = "timeline.metrics.host.aggregator.minute.checkpointCutOffMultiplier"; public static final String HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER = "timeline.metrics.host.aggregator.hourly.checkpointCutOffMultiplier"; + public static final String HOST_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER = + "timeline.metrics.host.aggregator.daily.checkpointCutOffMultiplier"; + public static final String CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER = "timeline.metrics.cluster.aggregator.minute.checkpointCutOffMultiplier"; public static final String CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER = "timeline.metrics.cluster.aggregator.hourly.checkpointCutOffMultiplier"; - public static final String CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL = - "timeline.metrics.cluster.aggregator.hourly.checkpointCutOffInterval"; + public static final String CLUSTER_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER = + "timeline.metrics.cluster.aggregator.daily.checkpointCutOffMultiplier"; public static final String GLOBAL_RESULT_LIMIT = "timeline.metrics.service.default.result.limit"; @@ -114,12 +133,18 @@ public class TimelineMetricConfiguration { public static final String HOST_AGGREGATOR_HOUR_DISABLED = "timeline.metrics.host.aggregator.hourly.disabled"; + public static final String HOST_AGGREGATOR_DAILY_DISABLED = + "timeline.metrics.host.aggregator.hourly.disabled"; + public static final String CLUSTER_AGGREGATOR_MINUTE_DISABLED = "timeline.metrics.cluster.aggregator.minute.disabled"; public static final String CLUSTER_AGGREGATOR_HOUR_DISABLED = "timeline.metrics.cluster.aggregator.hourly.disabled"; + public static final String CLUSTER_AGGREGATOR_DAILY_DISABLED = + "timeline.metrics.cluster.aggregator.daily.disabled"; + public static final String DISABLE_APPLICATION_TIMELINE_STORE = "timeline.service.disable.application.timeline.store"; http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java index 07717a8..415471d 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java @@ -41,19 +41,21 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti * Base class for all runnable aggregators. Provides common functions like * check pointing and scheduling. */ -public abstract class AbstractTimelineAggregator implements Runnable { +public abstract class AbstractTimelineAggregator implements TimelineMetricAggregator { protected final PhoenixHBaseAccessor hBaseAccessor; private final Log LOG; - private Clock clock; protected final long checkpointDelayMillis; protected final Integer resultsetFetchSize; protected Configuration metricsConf; - public AbstractTimelineAggregator(PhoenixHBaseAccessor hBaseAccessor, - Configuration metricsConf) { - this(hBaseAccessor, metricsConf, new SystemClock()); - } + private String checkpointLocation; + private Long sleepIntervalMillis; + private Integer checkpointCutOffMultiplier; + private String aggregatorDisableParam; + protected String tableName; + protected String outputTableName; + protected Long nativeTimeRangeDelay; public AbstractTimelineAggregator(PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, Clock clk) { @@ -66,6 +68,30 @@ public abstract class AbstractTimelineAggregator implements Runnable { this.clock = clk; } + public AbstractTimelineAggregator(PhoenixHBaseAccessor hBaseAccessor, + Configuration metricsConf) { + this(hBaseAccessor, metricsConf, new SystemClock()); + } + + public AbstractTimelineAggregator(PhoenixHBaseAccessor hBaseAccessor, + Configuration metricsConf, + String checkpointLocation, + Long sleepIntervalMillis, + Integer checkpointCutOffMultiplier, + String aggregatorDisableParam, + String tableName, + String outputTableName, + Long nativeTimeRangeDelay) { + this(hBaseAccessor, metricsConf); + this.checkpointLocation = checkpointLocation; + this.sleepIntervalMillis = sleepIntervalMillis; + this.checkpointCutOffMultiplier = checkpointCutOffMultiplier; + this.aggregatorDisableParam = aggregatorDisableParam; + this.tableName = tableName; + this.outputTableName = outputTableName; + this.nativeTimeRangeDelay = nativeTimeRangeDelay; + } + @Override public void run() { LOG.info("Started Timeline aggregator thread @ " + new Date()); @@ -198,6 +224,7 @@ public abstract class AbstractTimelineAggregator implements Runnable { * @param startTime Sample start time * @param endTime Sample end time */ + @Override public boolean doWork(long startTime, long endTime) { LOG.info("Start aggregation cycle @ " + new Date() + ", " + "startTime = " + new Date(startTime) + ", endTime = " + new Date(endTime)); @@ -257,18 +284,25 @@ public abstract class AbstractTimelineAggregator implements Runnable { protected abstract Condition prepareMetricQueryCondition(long startTime, long endTime); - protected abstract void aggregate(ResultSet rs, long startTime, long endTime) - throws IOException, SQLException; + protected abstract void aggregate(ResultSet rs, long startTime, long endTime) throws IOException, SQLException; - protected abstract Long getSleepIntervalMillis(); + protected Long getSleepIntervalMillis() { + return sleepIntervalMillis; + } - protected abstract Integer getCheckpointCutOffMultiplier(); + protected Integer getCheckpointCutOffMultiplier() { + return checkpointCutOffMultiplier; + } protected Long getCheckpointCutOffIntervalMillis() { return getCheckpointCutOffMultiplier() * getSleepIntervalMillis(); } - public abstract boolean isDisabled(); + public boolean isDisabled() { + return metricsConf.getBoolean(aggregatorDisableParam, false); + } - protected abstract String getCheckpointLocation(); + protected String getCheckpointLocation() { + return checkpointLocation; + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetricReader.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetricReader.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetricReader.java deleted file mode 100644 index 3df88d2..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetricReader.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; - -import java.sql.ResultSet; -import java.sql.SQLException; - -public class TimelineClusterMetricReader { - - private boolean ignoreInstance; - - public TimelineClusterMetricReader(boolean ignoreInstance) { - this.ignoreInstance = ignoreInstance; - } - - public TimelineClusterMetric fromResultSet(ResultSet rs) - throws SQLException { - - return new TimelineClusterMetric( - rs.getString("METRIC_NAME"), - rs.getString("APP_ID"), - ignoreInstance ? null : rs.getString("INSTANCE_ID"), - rs.getLong("SERVER_TIME"), - rs.getString("UNITS")); - } -} - http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java index a2887ea..96be48d 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java @@ -1,3 +1,5 @@ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; + /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -6,142 +8,27 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; -import java.io.IOException; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.HashMap; -import java.util.Map; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL; - -public class TimelineMetricAggregator extends AbstractTimelineAggregator { - private static final Log LOG = LogFactory.getLog - (TimelineMetricAggregator.class); - - private final String checkpointLocation; - private final Long sleepIntervalMillis; - private final Integer checkpointCutOffMultiplier; - private final String hostAggregatorDisabledParam; - private final String tableName; - private final String outputTableName; - private final Long nativeTimeRangeDelay; - - public TimelineMetricAggregator(PhoenixHBaseAccessor hBaseAccessor, - Configuration metricsConf, - String checkpointLocation, - Long sleepIntervalMillis, - Integer checkpointCutOffMultiplier, - String hostAggregatorDisabledParam, - String tableName, - String outputTableName, - Long nativeTimeRangeDelay) { - super(hBaseAccessor, metricsConf); - this.checkpointLocation = checkpointLocation; - this.sleepIntervalMillis = sleepIntervalMillis; - this.checkpointCutOffMultiplier = checkpointCutOffMultiplier; - this.hostAggregatorDisabledParam = hostAggregatorDisabledParam; - this.tableName = tableName; - this.outputTableName = outputTableName; - this.nativeTimeRangeDelay = nativeTimeRangeDelay; - } - - @Override - protected String getCheckpointLocation() { - return checkpointLocation; - } - - @Override - protected void aggregate(ResultSet rs, long startTime, long endTime) - throws IOException, SQLException { - Map<TimelineMetric, MetricHostAggregate> hostAggregateMap = - aggregateMetricsFromResultSet(rs); - - LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates."); - hBaseAccessor.saveHostAggregateRecords(hostAggregateMap, - outputTableName); - } - - @Override - protected Condition prepareMetricQueryCondition(long startTime, long endTime) { - Condition condition = new DefaultCondition(null, null, null, null, startTime, - endTime, null, null, true); - condition.setNoLimit(); - condition.setFetchSize(resultsetFetchSize); - condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL, - PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, nativeTimeRangeDelay), - tableName)); - condition.addOrderByColumn("METRIC_NAME"); - condition.addOrderByColumn("HOSTNAME"); - condition.addOrderByColumn("APP_ID"); - condition.addOrderByColumn("INSTANCE_ID"); - condition.addOrderByColumn("SERVER_TIME"); - return condition; - } - - private Map<TimelineMetric, MetricHostAggregate> aggregateMetricsFromResultSet - (ResultSet rs) throws IOException, SQLException { - TimelineMetric existingMetric = null; - MetricHostAggregate hostAggregate = null; - Map<TimelineMetric, MetricHostAggregate> hostAggregateMap = - new HashMap<TimelineMetric, MetricHostAggregate>(); - - while (rs.next()) { - TimelineMetric currentMetric = - PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs); - MetricHostAggregate currentHostAggregate = - PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs); - - if (existingMetric == null) { - // First row - existingMetric = currentMetric; - hostAggregate = new MetricHostAggregate(); - hostAggregateMap.put(currentMetric, hostAggregate); - } - - if (existingMetric.equalsExceptTime(currentMetric)) { - // Recalculate totals with current metric - hostAggregate.updateAggregates(currentHostAggregate); - } else { - // Switched over to a new metric - save existing - create new aggregate - hostAggregate = new MetricHostAggregate(); - hostAggregate.updateAggregates(currentHostAggregate); - hostAggregateMap.put(currentMetric, hostAggregate); - existingMetric = currentMetric; - } - } - return hostAggregateMap; - } - - @Override - protected Long getSleepIntervalMillis() { - return sleepIntervalMillis; - } - - @Override - protected Integer getCheckpointCutOffMultiplier() { - return checkpointCutOffMultiplier; - } - - @Override - public boolean isDisabled() { - return metricsConf.getBoolean(hostAggregatorDisabledParam, false); - } +public interface TimelineMetricAggregator extends Runnable { + /** + * Aggregate metric data within the time bounds. + * @param startTime start time millis + * @param endTime end time millis + * @return success + */ + public boolean doWork(long startTime, long endTime); + + /** + * Is aggregator is disabled by configuration. + * @return true/false + */ + public boolean isDisabled(); } http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java index a0e4e32..642fcfe 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java @@ -22,10 +22,20 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_DAILY_DISABLED; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_DAILY_SLEEP_INTERVAL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_DISABLED; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_DISABLED; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_DAILY_DISABLED; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_DAILY_SLEEP_INTERVAL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_DISABLED; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL; @@ -33,20 +43,42 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_DISABLED; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; +/** + * Factory class that knows how to create a aggregator instance using + * @TimelineMetricConfiguration + */ public class TimelineMetricAggregatorFactory { - private static final String MINUTE_AGGREGATE_CHECKPOINT_FILE = + private static final String HOST_AGGREGATE_MINUTE_CHECKPOINT_FILE = "timeline-metrics-host-aggregator-checkpoint"; - private static final String MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE = + private static final String HOST_AGGREGATE_HOURLY_CHECKPOINT_FILE = "timeline-metrics-host-aggregator-hourly-checkpoint"; + private static final String HOST_AGGREGATE_DAILY_CHECKPOINT_FILE = + "timeline-metrics-host-aggregator-daily-checkpoint"; + private static final String CLUSTER_AGGREGATOR_CHECKPOINT_FILE = + "timeline-metrics-cluster-aggregator-checkpoint"; + private static final String CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE = + "timeline-metrics-cluster-aggregator-hourly-checkpoint"; + private static final String CLUSTER_AGGREGATOR_DAILY_CHECKPOINT_FILE = + "timeline-metrics-cluster-aggregator-daily-checkpoint"; + /** + * Minute based aggregation for hosts. + */ public static TimelineMetricAggregator createTimelineMetricAggregatorMinute (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { String checkpointDir = metricsConf.get( TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); String checkpointLocation = FilenameUtils.concat(checkpointDir, - MINUTE_AGGREGATE_CHECKPOINT_FILE); + HOST_AGGREGATE_MINUTE_CHECKPOINT_FILE); long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong (HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300l)); // 5 mins @@ -57,7 +89,7 @@ public class TimelineMetricAggregatorFactory { String inputTableName = METRICS_RECORD_TABLE_NAME; String outputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME; - return new TimelineMetricAggregator(hBaseAccessor, metricsConf, + return new TimelineMetricHostAggregator(hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, checkpointCutOffMultiplier, @@ -67,13 +99,16 @@ public class TimelineMetricAggregatorFactory { 120000l); } + /** + * Hourly aggregation for hosts. + */ public static TimelineMetricAggregator createTimelineMetricAggregatorHourly (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { String checkpointDir = metricsConf.get( TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); String checkpointLocation = FilenameUtils.concat(checkpointDir, - MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE); + HOST_AGGREGATE_HOURLY_CHECKPOINT_FILE); long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong (HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l)); @@ -84,7 +119,37 @@ public class TimelineMetricAggregatorFactory { String inputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME; String outputTableName = METRICS_AGGREGATE_HOURLY_TABLE_NAME; - return new TimelineMetricAggregator(hBaseAccessor, metricsConf, + return new TimelineMetricHostAggregator(hBaseAccessor, metricsConf, + checkpointLocation, + sleepIntervalMillis, + checkpointCutOffMultiplier, + hostAggregatorDisabledParam, + inputTableName, + outputTableName, + 3600000l); + } + + /** + * Daily aggregation for hosts. + */ + public static TimelineMetricAggregator createTimelineMetricAggregatorDaily + (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { + + String checkpointDir = metricsConf.get( + TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); + String checkpointLocation = FilenameUtils.concat(checkpointDir, + HOST_AGGREGATE_DAILY_CHECKPOINT_FILE); + long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong + (HOST_AGGREGATOR_DAILY_SLEEP_INTERVAL, 86400l)); + + int checkpointCutOffMultiplier = metricsConf.getInt + (HOST_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER, 1); + String hostAggregatorDisabledParam = HOST_AGGREGATOR_DAILY_DISABLED; + + String inputTableName = METRICS_AGGREGATE_HOURLY_TABLE_NAME; + String outputTableName = METRICS_AGGREGATE_DAILY_TABLE_NAME; + + return new TimelineMetricHostAggregator(hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, checkpointCutOffMultiplier, @@ -94,5 +159,110 @@ public class TimelineMetricAggregatorFactory { 3600000l); } + /** + * Minute based aggregation for cluster. + */ + public static TimelineMetricAggregator createTimelineClusterAggregatorMinute( + PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { + + String checkpointDir = metricsConf.get( + TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); + + String checkpointLocation = FilenameUtils.concat(checkpointDir, + CLUSTER_AGGREGATOR_CHECKPOINT_FILE); + + long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong + (CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 120l)); + + long timeSliceIntervalMillis = SECONDS.toMillis(metricsConf.getInt + (CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 15)); + + int checkpointCutOffMultiplier = + metricsConf.getInt(CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 2); + + String inputTableName = METRICS_RECORD_TABLE_NAME; + String outputTableName = METRICS_CLUSTER_AGGREGATE_TABLE_NAME; + String aggregatorDisabledParam = CLUSTER_AGGREGATOR_MINUTE_DISABLED; + + // Minute based aggregation have added responsibility of time slicing + return new TimelineMetricClusterAggregatorMinute( + hBaseAccessor, metricsConf, + checkpointLocation, + sleepIntervalMillis, + checkpointCutOffMultiplier, + aggregatorDisabledParam, + inputTableName, + outputTableName, + 120000l, + timeSliceIntervalMillis + ); + } + + /** + * Hourly aggregation for cluster. + */ + public static TimelineMetricAggregator createTimelineClusterAggregatorHourly( + PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { + + String checkpointDir = metricsConf.get( + TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); + + String checkpointLocation = FilenameUtils.concat(checkpointDir, + CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE); + + long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong + (CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l)); + + int checkpointCutOffMultiplier = metricsConf.getInt + (CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2); + + String inputTableName = METRICS_CLUSTER_AGGREGATE_TABLE_NAME; + String outputTableName = METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME; + String aggregatorDisabledParam = CLUSTER_AGGREGATOR_HOUR_DISABLED; + + return new TimelineMetricClusterAggregator( + hBaseAccessor, metricsConf, + checkpointLocation, + sleepIntervalMillis, + checkpointCutOffMultiplier, + aggregatorDisabledParam, + inputTableName, + outputTableName, + 120000l + ); + } + + /** + * Daily aggregation for cluster. + */ + public static TimelineMetricAggregator createTimelineClusterAggregatorDaily( + PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { + + String checkpointDir = metricsConf.get( + TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); + + String checkpointLocation = FilenameUtils.concat(checkpointDir, + CLUSTER_AGGREGATOR_DAILY_CHECKPOINT_FILE); + + long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong + (CLUSTER_AGGREGATOR_DAILY_SLEEP_INTERVAL, 86400l)); + + int checkpointCutOffMultiplier = metricsConf.getInt + (CLUSTER_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER, 1); + String inputTableName = METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME; + String outputTableName = METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME; + String aggregatorDisabledParam = CLUSTER_AGGREGATOR_DAILY_DISABLED; + + return new TimelineMetricClusterAggregator( + hBaseAccessor, metricsConf, + checkpointLocation, + sleepIntervalMillis, + checkpointCutOffMultiplier, + aggregatorDisabledParam, + inputTableName, + outputTableName, + 120000l + ); + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java index 68b2ba9..9ed11e1 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java @@ -17,12 +17,9 @@ */ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; - -import org.apache.commons.io.FilenameUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition; @@ -30,74 +27,32 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline. import java.io.IOException; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_DISABLED; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_TIME_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA; -/** - * Aggregates a metric across all hosts in the cluster. Reads metrics from - * the precision table and saves into the aggregate. - */ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator { + private final TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(true); private static final Log LOG = LogFactory.getLog(TimelineMetricClusterAggregator.class); - private static final String CLUSTER_AGGREGATOR_CHECKPOINT_FILE = - "timeline-metrics-cluster-aggregator-checkpoint"; - private final String checkpointLocation; - private final Long sleepIntervalMillis; - public final int timeSliceIntervalMillis; - private final Integer checkpointCutOffMultiplier; - private TimelineMetricReadHelper timelineMetricReadHelper = new TimelineMetricReadHelper(true); - // Aggregator to perform app-level aggregates for host metrics - private final TimelineMetricAppAggregator appAggregator; + private final boolean isClusterPrecisionInputTable; public TimelineMetricClusterAggregator(PhoenixHBaseAccessor hBaseAccessor, - Configuration metricsConf) { - super(hBaseAccessor, metricsConf); - - String checkpointDir = metricsConf.get( - TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); - - checkpointLocation = FilenameUtils.concat(checkpointDir, - CLUSTER_AGGREGATOR_CHECKPOINT_FILE); - - sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong - (CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 120l)); - timeSliceIntervalMillis = (int)SECONDS.toMillis(metricsConf.getInt - (CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 15)); - checkpointCutOffMultiplier = - metricsConf.getInt(CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 2); - - appAggregator = new TimelineMetricAppAggregator(metricsConf); - } - - @Override - protected String getCheckpointLocation() { - return checkpointLocation; - } - - @Override - protected void aggregate(ResultSet rs, long startTime, long endTime) - throws SQLException, IOException { - List<Long[]> timeSlices = getTimeSlices(startTime, endTime); - // Initialize app aggregates for host metrics - appAggregator.init(); - Map<TimelineClusterMetric, MetricClusterAggregate> - aggregateClusterMetrics = aggregateMetricsFromResultSet(rs, timeSlices); - - LOG.info("Saving " + aggregateClusterMetrics.size() + " metric aggregates."); - hBaseAccessor.saveClusterAggregateRecords(aggregateClusterMetrics); - appAggregator.cleanup(); + Configuration metricsConf, + String checkpointLocation, + Long sleepIntervalMillis, + Integer checkpointCutOffMultiplier, + String hostAggregatorDisabledParam, + String inputTableName, + String outputTableName, + Long nativeTimeRangeDelay) { + super(hBaseAccessor, metricsConf, checkpointLocation, + sleepIntervalMillis, checkpointCutOffMultiplier, + hostAggregatorDisabledParam, inputTableName, outputTableName, + nativeTimeRangeDelay); + isClusterPrecisionInputTable = inputTableName.equals(METRICS_CLUSTER_AGGREGATE_TABLE_NAME); } @Override @@ -106,9 +61,17 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator endTime, null, null, true); condition.setNoLimit(); condition.setFetchSize(resultsetFetchSize); - condition.setStatement(String.format(GET_METRIC_SQL, + String sqlStr = String.format(GET_CLUSTER_AGGREGATE_TIME_SQL, PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA), - METRICS_RECORD_TABLE_NAME)); + tableName); + // HOST_COUNT vs METRIC_COUNT + if (isClusterPrecisionInputTable) { + sqlStr = String.format(GET_CLUSTER_AGGREGATE_SQL, + PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA), + tableName); + } + + condition.setStatement(sqlStr); condition.addOrderByColumn("METRIC_NAME"); condition.addOrderByColumn("APP_ID"); condition.addOrderByColumn("INSTANCE_ID"); @@ -116,120 +79,58 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator return condition; } - private List<Long[]> getTimeSlices(long startTime, long endTime) { - List<Long[]> timeSlices = new ArrayList<Long[]>(); - long sliceStartTime = startTime; - while (sliceStartTime < endTime) { - timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + timeSliceIntervalMillis}); - sliceStartTime += timeSliceIntervalMillis; - } - return timeSlices; - } - - private Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMetricsFromResultSet(ResultSet rs, List<Long[]> timeSlices) - throws SQLException, IOException { - Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics = - new HashMap<TimelineClusterMetric, MetricClusterAggregate>(); - // Create time slices - - while (rs.next()) { - TimelineMetric metric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs); - - Map<TimelineClusterMetric, Double> clusterMetrics = - sliceFromTimelineMetric(metric, timeSlices); - - if (clusterMetrics != null && !clusterMetrics.isEmpty()) { - for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry : - clusterMetrics.entrySet()) { - - TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey(); - Double avgValue = clusterMetricEntry.getValue(); - - MetricClusterAggregate aggregate = aggregateClusterMetrics.get(clusterMetric); - - if (aggregate == null) { - aggregate = new MetricClusterAggregate(avgValue, 1, null, avgValue, avgValue); - aggregateClusterMetrics.put(clusterMetric, aggregate); - } else { - aggregate.updateSum(avgValue); - aggregate.updateNumberOfHosts(1); - aggregate.updateMax(avgValue); - aggregate.updateMin(avgValue); - } - // Update app level aggregates - appAggregator.processTimelineClusterMetric(clusterMetric, - metric.getHostName(), avgValue); - } - } - } - // Add app level aggregates to save - aggregateClusterMetrics.putAll(appAggregator.getAggregateClusterMetrics()); - return aggregateClusterMetrics; - } - @Override - protected Long getSleepIntervalMillis() { - return sleepIntervalMillis; - } + protected void aggregate(ResultSet rs, long startTime, long endTime) throws IOException, SQLException { + Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap = aggregateMetricsFromResultSet(rs); - @Override - protected Integer getCheckpointCutOffMultiplier() { - return checkpointCutOffMultiplier; + LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates."); + hBaseAccessor.saveClusterTimeAggregateRecords(hostAggregateMap, outputTableName); } - @Override - public boolean isDisabled() { - return metricsConf.getBoolean(CLUSTER_AGGREGATOR_MINUTE_DISABLED, false); - } + private Map<TimelineClusterMetric, MetricHostAggregate> aggregateMetricsFromResultSet(ResultSet rs) + throws IOException, SQLException { - private Map<TimelineClusterMetric, Double> sliceFromTimelineMetric( - TimelineMetric timelineMetric, List<Long[]> timeSlices) { + TimelineClusterMetric existingMetric = null; + MetricHostAggregate hostAggregate = null; + Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap = + new HashMap<TimelineClusterMetric, MetricHostAggregate>(); - if (timelineMetric.getMetricValues().isEmpty()) { - return null; - } + while (rs.next()) { + TimelineClusterMetric currentMetric = readHelper.fromResultSet(rs); + + MetricClusterAggregate currentHostAggregate = + isClusterPrecisionInputTable ? + readHelper.getMetricClusterAggregateFromResultSet(rs) : + readHelper.getMetricClusterTimeAggregateFromResultSet(rs); + + if (existingMetric == null) { + // First row + existingMetric = currentMetric; + hostAggregate = new MetricHostAggregate(); + hostAggregateMap.put(currentMetric, hostAggregate); + } - Map<TimelineClusterMetric, Double> timelineClusterMetricMap = - new HashMap<TimelineClusterMetric, Double>(); + if (existingMetric.equalsExceptTime(currentMetric)) { + // Recalculate totals with current metric + updateAggregatesFromHost(hostAggregate, currentHostAggregate); - for (Map.Entry<Long, Double> metric : timelineMetric.getMetricValues().entrySet()) { - // TODO: investigate null values - pre filter - if (metric.getValue() == null) { - continue; - } - Long timestamp = getSliceTimeForMetric(timeSlices, - Long.parseLong(metric.getKey().toString())); - if (timestamp != -1) { - // Metric is within desired time range - TimelineClusterMetric clusterMetric = new TimelineClusterMetric( - timelineMetric.getMetricName(), - timelineMetric.getAppId(), - timelineMetric.getInstanceId(), - timestamp, - timelineMetric.getType()); - if (!timelineClusterMetricMap.containsKey(clusterMetric)) { - timelineClusterMetricMap.put(clusterMetric, metric.getValue()); - } else { - Double oldValue = timelineClusterMetricMap.get(clusterMetric); - Double newValue = (oldValue + metric.getValue()) / 2; - timelineClusterMetricMap.put(clusterMetric, newValue); - } + } else { + // Switched over to a new metric - save existing + hostAggregate = new MetricHostAggregate(); + updateAggregatesFromHost(hostAggregate, currentHostAggregate); + hostAggregateMap.put(currentMetric, hostAggregate); + existingMetric = currentMetric; } + } - return timelineClusterMetricMap; + return hostAggregateMap; } - /** - * Return beginning of the time slice into which the metric fits. - */ - private Long getSliceTimeForMetric(List<Long[]> timeSlices, Long timestamp) { - for (Long[] timeSlice : timeSlices) { - if (timestamp >= timeSlice[0] && timestamp < timeSlice[1]) { - return timeSlice[0]; - } - } - return -1l; + private void updateAggregatesFromHost(MetricHostAggregate agg, MetricClusterAggregate currentClusterAggregate) { + agg.updateMax(currentClusterAggregate.getMax()); + agg.updateMin(currentClusterAggregate.getMin()); + agg.updateSum(currentClusterAggregate.getSum()); + agg.updateNumberOfSamples(currentClusterAggregate.getNumberOfHosts()); } - } http://git-wip-us.apache.org/repos/asf/ambari/blob/67c425ac/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorHourly.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorHourly.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorHourly.java deleted file mode 100644 index 264e4e6..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorHourly.java +++ /dev/null @@ -1,175 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; - -import org.apache.commons.io.FilenameUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; -import java.io.IOException; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.HashMap; -import java.util.Map; -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.getMetricClusterAggregateFromResultSet; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_DISABLED; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR; - -public class TimelineMetricClusterAggregatorHourly extends AbstractTimelineAggregator { - private static final Log LOG = LogFactory.getLog - (TimelineMetricClusterAggregatorHourly.class); - private static final String CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE = - "timeline-metrics-cluster-aggregator-hourly-checkpoint"; - private final String checkpointLocation; - private final long sleepIntervalMillis; - private final Integer checkpointCutOffMultiplier; - private long checkpointCutOffIntervalMillis; - private static final Long NATIVE_TIME_RANGE_DELTA = 3600000l; // 1 hour - private final TimelineClusterMetricReader timelineClusterMetricReader - = new TimelineClusterMetricReader(true); - - public TimelineMetricClusterAggregatorHourly( - PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { - super(hBaseAccessor, metricsConf); - - String checkpointDir = metricsConf.get( - TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); - - checkpointLocation = FilenameUtils.concat(checkpointDir, - CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE); - - sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong - (CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l)); - checkpointCutOffIntervalMillis = SECONDS.toMillis(metricsConf.getLong - (CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL, 7200l)); - checkpointCutOffMultiplier = metricsConf.getInt - (CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2); - } - - @Override - protected String getCheckpointLocation() { - return checkpointLocation; - } - - @Override - protected void aggregate(ResultSet rs, long startTime, long endTime) - throws SQLException, IOException { - Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap = - aggregateMetricsFromResultSet(rs); - - LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates."); - hBaseAccessor.saveClusterAggregateHourlyRecords(hostAggregateMap, - METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME); - } - - @Override - protected Condition prepareMetricQueryCondition(long startTime, - long endTime) { - Condition condition = new DefaultCondition(null, null, null, null, startTime, - endTime, null, null, true); - condition.setNoLimit(); - condition.setFetchSize(resultsetFetchSize); - condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL, - PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA), - METRICS_CLUSTER_AGGREGATE_TABLE_NAME)); - condition.addOrderByColumn("METRIC_NAME"); - condition.addOrderByColumn("APP_ID"); - condition.addOrderByColumn("INSTANCE_ID"); - condition.addOrderByColumn("SERVER_TIME"); - return condition; - } - - private Map<TimelineClusterMetric, MetricHostAggregate> aggregateMetricsFromResultSet(ResultSet rs) - throws IOException, SQLException { - - TimelineClusterMetric existingMetric = null; - MetricHostAggregate hostAggregate = null; - Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap = - new HashMap<TimelineClusterMetric, MetricHostAggregate>(); - - while (rs.next()) { - TimelineClusterMetric currentMetric = - timelineClusterMetricReader.fromResultSet(rs); - MetricClusterAggregate currentHostAggregate = - getMetricClusterAggregateFromResultSet(rs); - - if (existingMetric == null) { - // First row - existingMetric = currentMetric; - hostAggregate = new MetricHostAggregate(); - hostAggregateMap.put(currentMetric, hostAggregate); - } - - if (existingMetric.equalsExceptTime(currentMetric)) { - // Recalculate totals with current metric - updateAggregatesFromHost(hostAggregate, currentHostAggregate); - - } else { - // Switched over to a new metric - save existing - hostAggregate = new MetricHostAggregate(); - updateAggregatesFromHost(hostAggregate, currentHostAggregate); - hostAggregateMap.put(currentMetric, hostAggregate); - existingMetric = currentMetric; - } - - } - - return hostAggregateMap; - } - - private void updateAggregatesFromHost(MetricHostAggregate agg, MetricClusterAggregate currentClusterAggregate) { - agg.updateMax(currentClusterAggregate.getMax()); - agg.updateMin(currentClusterAggregate.getMin()); - agg.updateSum(currentClusterAggregate.getSum()); - agg.updateNumberOfSamples(currentClusterAggregate.getNumberOfHosts()); - } - - @Override - protected Long getSleepIntervalMillis() { - return sleepIntervalMillis; - } - - @Override - protected Integer getCheckpointCutOffMultiplier() { - return checkpointCutOffMultiplier; - } - - @Override - protected Long getCheckpointCutOffIntervalMillis() { - return checkpointCutOffIntervalMillis; - } - - @Override - public boolean isDisabled() { - return metricsConf.getBoolean(CLUSTER_AGGREGATOR_HOUR_DISABLED, false); - } - - -}