http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java deleted file mode 100644 index 02cc207..0000000 --- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; - - -import org.codehaus.jackson.annotate.JsonCreator; -import org.codehaus.jackson.annotate.JsonProperty; - -/** - * Represents a collection of minute based aggregation of values for - * resolution greater than a minute. - */ -public class MetricHostAggregate extends MetricAggregate { - - private long numberOfSamples = 0; - - @JsonCreator - public MetricHostAggregate() { - super(0.0, 0.0, Double.MIN_VALUE, Double.MAX_VALUE); - } - - public MetricHostAggregate(Double sum, int numberOfSamples, - Double deviation, - Double max, Double min) { - super(sum, deviation, max, min); - this.numberOfSamples = numberOfSamples; - } - - @JsonProperty("numberOfSamples") - long getNumberOfSamples() { - return numberOfSamples == 0 ? 1 : numberOfSamples; - } - - void updateNumberOfSamples(long count) { - this.numberOfSamples += count; - } - - public void setNumberOfSamples(long numberOfSamples) { - this.numberOfSamples = numberOfSamples; - } - - public double getAvg() { - return sum / numberOfSamples; - } - - /** - * Find and update min, max and avg for a minute - */ - void updateAggregates(MetricHostAggregate hostAggregate) { - updateMax(hostAggregate.getMax()); - updateMin(hostAggregate.getMin()); - updateSum(hostAggregate.getSum()); - updateNumberOfSamples(hostAggregate.getNumberOfSamples()); - } - - @Override - public String toString() { - return "MetricHostAggregate{" + - "sum=" + sum + - ", numberOfSamples=" + numberOfSamples + - ", deviation=" + deviation + - ", max=" + max + - ", min=" + min + - '}'; - } -}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricsInitializationException.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricsInitializationException.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricsInitializationException.java deleted file mode 100644 index 88a427a..0000000 --- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricsInitializationException.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics - .timeline; - -/** - * RuntimeException for initialization of metrics schema. It is RuntimeException - * since this is a not recoverable situation, and should be handled by main or - * service method followed by shutdown. - */ -public class MetricsInitializationException extends RuntimeException { - public MetricsInitializationException() { - } - - public MetricsInitializationException(String msg) { - super(msg); - } - - public MetricsInitializationException(Throwable t) { - super(t); - } - - public MetricsInitializationException(String msg, Throwable t) { - super(msg, t); - } - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java deleted file mode 100644 index 4f248b7..0000000 --- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java +++ /dev/null @@ -1,678 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; - - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.util.RetryCounter; -import org.apache.hadoop.hbase.util.RetryCounterFactory; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; -import org.apache.hadoop.yarn.util.timeline.TimelineUtils; -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.type.TypeReference; -import java.io.IOException; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DEFAULT_ENCODING; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_AGGREGATE_RECORD_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_TIME_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_METRICS_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_HOUR_TABLE_TTL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_MINUTE_TABLE_TTL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_MAX_RETRIES; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_RESULT_LIMIT; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_RETRY_INTERVAL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_ENCODING_SCHEME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_HOUR_TABLE_TTL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_MINUTE_TABLE_TTL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_TTL; - -/** - * Provides a facade over the Phoenix API to access HBase schema - */ -public class PhoenixHBaseAccessor { - - private static final Log LOG = LogFactory.getLog(PhoenixHBaseAccessor.class); - private final Configuration hbaseConf; - private final Configuration metricsConf; - private final RetryCounterFactory retryCounterFactory; - - static final int PHOENIX_MAX_MUTATION_STATE_SIZE = 50000; - /** - * 4 metrics/min * 60 * 24: Retrieve data for 1 day. - */ - private static final int METRICS_PER_MINUTE = 4; - public static int RESULTSET_LIMIT = (int)TimeUnit.DAYS.toMinutes(1) * - METRICS_PER_MINUTE; - private static ObjectMapper mapper = new ObjectMapper(); - - private static TypeReference<Map<Long, Double>> metricValuesTypeRef = - new TypeReference<Map<Long, Double>>() {}; - private final ConnectionProvider dataSource; - - public PhoenixHBaseAccessor(Configuration hbaseConf, - Configuration metricsConf){ - this(hbaseConf, metricsConf, new DefaultPhoenixDataSource(hbaseConf)); - } - - public PhoenixHBaseAccessor(Configuration hbaseConf, - Configuration metricsConf, - ConnectionProvider dataSource) { - this.hbaseConf = hbaseConf; - this.metricsConf = metricsConf; - RESULTSET_LIMIT = metricsConf.getInt(GLOBAL_RESULT_LIMIT, 5760); - try { - Class.forName("org.apache.phoenix.jdbc.PhoenixDriver"); - } catch (ClassNotFoundException e) { - LOG.error("Phoenix client jar not found in the classpath.", e); - throw new IllegalStateException(e); - } - this.dataSource = dataSource; - this.retryCounterFactory = new RetryCounterFactory( - metricsConf.getInt(GLOBAL_MAX_RETRIES, 10), - (int) SECONDS.toMillis(metricsConf.getInt(GLOBAL_RETRY_INTERVAL, 5))); - } - - - private Connection getConnectionRetryingOnException() - throws SQLException, InterruptedException { - RetryCounter retryCounter = retryCounterFactory.create(); - while (true) { - try{ - return getConnection(); - } catch (SQLException e) { - if(!retryCounter.shouldRetry()){ - LOG.error("HBaseAccessor getConnection failed after " - + retryCounter.getMaxAttempts() + " attempts"); - throw e; - } - } - retryCounter.sleepUntilNextRetry(); - } - } - - - /** - * Get JDBC connection to HBase store. Assumption is that the hbase - * configuration is present on the classpath and loaded by the caller into - * the Configuration object. - * Phoenix already caches the HConnection between the client and HBase - * cluster. - * - * @return @java.sql.Connection - */ - public Connection getConnection() throws SQLException { - return dataSource.getConnection(); - } - - public static Map readMetricFromJSON(String json) throws IOException { - return mapper.readValue(json, metricValuesTypeRef); - } - - @SuppressWarnings("unchecked") - static TimelineMetric getTimelineMetricFromResultSet(ResultSet rs) - throws SQLException, IOException { - TimelineMetric metric = new TimelineMetric(); - metric.setMetricName(rs.getString("METRIC_NAME")); - metric.setAppId(rs.getString("APP_ID")); - metric.setInstanceId(rs.getString("INSTANCE_ID")); - metric.setHostName(rs.getString("HOSTNAME")); - metric.setTimestamp(rs.getLong("SERVER_TIME")); - metric.setStartTime(rs.getLong("START_TIME")); - metric.setType(rs.getString("UNITS")); - metric.setMetricValues( - (Map<Long, Double>) readMetricFromJSON(rs.getString("METRICS"))); - return metric; - } - - static TimelineMetric getTimelineMetricKeyFromResultSet(ResultSet rs) - throws SQLException, IOException { - TimelineMetric metric = new TimelineMetric(); - metric.setMetricName(rs.getString("METRIC_NAME")); - metric.setAppId(rs.getString("APP_ID")); - metric.setInstanceId(rs.getString("INSTANCE_ID")); - metric.setHostName(rs.getString("HOSTNAME")); - metric.setTimestamp(rs.getLong("SERVER_TIME")); - metric.setType(rs.getString("UNITS")); - return metric; - } - - static MetricHostAggregate getMetricHostAggregateFromResultSet(ResultSet rs) - throws SQLException { - MetricHostAggregate metricHostAggregate = new MetricHostAggregate(); - metricHostAggregate.setSum(rs.getDouble("METRIC_SUM")); - metricHostAggregate.setMax(rs.getDouble("METRIC_MAX")); - metricHostAggregate.setMin(rs.getDouble("METRIC_MIN")); - metricHostAggregate.setNumberOfSamples(rs.getLong("METRIC_COUNT")); - - metricHostAggregate.setDeviation(0.0); - return metricHostAggregate; - } - - static TimelineClusterMetric - getTimelineMetricClusterKeyFromResultSet(ResultSet rs) - throws SQLException, IOException { - TimelineClusterMetric metric = new TimelineClusterMetric( - rs.getString("METRIC_NAME"), - rs.getString("APP_ID"), - rs.getString("INSTANCE_ID"), - rs.getLong("SERVER_TIME"), - rs.getString("UNITS")); - - return metric; - } - - static MetricClusterAggregate - getMetricClusterAggregateFromResultSet(ResultSet rs) - throws SQLException { - MetricClusterAggregate agg = new MetricClusterAggregate(); - agg.setSum(rs.getDouble("METRIC_SUM")); - agg.setMax(rs.getDouble("METRIC_MAX")); - agg.setMin(rs.getDouble("METRIC_MIN")); - agg.setNumberOfHosts(rs.getInt("HOSTS_COUNT")); - - agg.setDeviation(0.0); - - return agg; - } - - protected void initMetricSchema() { - Connection conn = null; - Statement stmt = null; - - String encoding = metricsConf.get(HBASE_ENCODING_SCHEME, DEFAULT_ENCODING); - String compression = metricsConf.get(HBASE_COMPRESSION_SCHEME, DEFAULT_TABLE_COMPRESSION); - String precisionTtl = metricsConf.get(PRECISION_TABLE_TTL, "86400"); - String hostMinTtl = metricsConf.get(HOST_MINUTE_TABLE_TTL, "604800"); - String hostHourTtl = metricsConf.get(HOST_HOUR_TABLE_TTL, "2592000"); - String clusterMinTtl = metricsConf.get(CLUSTER_MINUTE_TABLE_TTL, "2592000"); - String clusterHourTtl = metricsConf.get(CLUSTER_HOUR_TABLE_TTL, "31536000"); - - try { - LOG.info("Initializing metrics schema..."); - conn = getConnectionRetryingOnException(); - stmt = conn.createStatement(); - - stmt.executeUpdate(String.format(CREATE_METRICS_TABLE_SQL, - encoding, precisionTtl, compression)); - stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL, - encoding, hostHourTtl, compression)); - stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL, - encoding, hostMinTtl, compression)); - stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL, - encoding, clusterMinTtl, compression)); - stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL, - encoding, clusterHourTtl, compression)); - conn.commit(); - } catch (SQLException sql) { - LOG.warn("Error creating Metrics Schema in HBase using Phoenix.", sql); - throw new MetricsInitializationException( - "Error creating Metrics Schema in HBase using Phoenix.", sql); - } catch (InterruptedException e) { - LOG.warn("Error creating Metrics Schema in HBase using Phoenix.", e); - throw new MetricsInitializationException( - "Error creating Metrics Schema in HBase using Phoenix.", e); - } finally { - if (stmt != null) { - try { - stmt.close(); - } catch (SQLException e) { - // Ignore - } - } - if (conn != null) { - try { - conn.close(); - } catch (SQLException e) { - // Ignore - } - } - } - } - - public void insertMetricRecords(TimelineMetrics metrics) - throws SQLException, IOException { - - List<TimelineMetric> timelineMetrics = metrics.getMetrics(); - if (timelineMetrics == null || timelineMetrics.isEmpty()) { - LOG.debug("Empty metrics insert request."); - return; - } - - Connection conn = getConnection(); - PreparedStatement metricRecordStmt = null; - long currentTime = System.currentTimeMillis(); - - try { - metricRecordStmt = conn.prepareStatement(String.format( - UPSERT_METRICS_SQL, METRICS_RECORD_TABLE_NAME)); - - for (TimelineMetric metric : timelineMetrics) { - metricRecordStmt.clearParameters(); - - LOG.trace("host: " + metric.getHostName() + ", " + - "metricName = " + metric.getMetricName() + ", " + - "values: " + metric.getMetricValues()); - Aggregator agg = new Aggregator(); - double[] aggregates = agg.calculateAggregates( - metric.getMetricValues()); - - metricRecordStmt.setString(1, metric.getMetricName()); - metricRecordStmt.setString(2, metric.getHostName()); - metricRecordStmt.setString(3, metric.getAppId()); - metricRecordStmt.setString(4, metric.getInstanceId()); - metricRecordStmt.setLong(5, currentTime); - metricRecordStmt.setLong(6, metric.getStartTime()); - metricRecordStmt.setString(7, metric.getType()); - metricRecordStmt.setDouble(8, aggregates[0]); - metricRecordStmt.setDouble(9, aggregates[1]); - metricRecordStmt.setDouble(10, aggregates[2]); - metricRecordStmt.setLong(11, (long)aggregates[3]); - String json = - TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues()); - metricRecordStmt.setString(12, json); - - try { - metricRecordStmt.executeUpdate(); - } catch (SQLException sql) { - LOG.error(sql); - } - } - - conn.commit(); - - } finally { - if (metricRecordStmt != null) { - try { - metricRecordStmt.close(); - } catch (SQLException e) { - // Ignore - } - } - if (conn != null) { - try { - conn.close(); - } catch (SQLException sql) { - // Ignore - } - } - } - } - - - @SuppressWarnings("unchecked") - public TimelineMetrics getMetricRecords(final Condition condition) - throws SQLException, IOException { - - if (condition.isEmpty()) { - throw new SQLException("No filter criteria specified."); - } - - Connection conn = getConnection(); - PreparedStatement stmt = null; - TimelineMetrics metrics = new TimelineMetrics(); - - try { - stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); - - ResultSet rs = stmt.executeQuery(); - - while (rs.next()) { - TimelineMetric metric = getTimelineMetricFromResultSet(rs); - - if (condition.isGrouped()) { - metrics.addOrMergeTimelineMetric(metric); - } else { - metrics.getMetrics().add(metric); - } - } - - } finally { - if (stmt != null) { - try { - stmt.close(); - } catch (SQLException e) { - // Ignore - } - } - if (conn != null) { - try { - conn.close(); - } catch (SQLException sql) { - // Ignore - } - } - } - return metrics; - } - - public void saveHostAggregateRecords(Map<TimelineMetric, - MetricHostAggregate> hostAggregateMap, String phoenixTableName) - throws SQLException { - - if (hostAggregateMap != null && !hostAggregateMap.isEmpty()) { - Connection conn = getConnection(); - PreparedStatement stmt = null; - - long start = System.currentTimeMillis(); - int rowCount = 0; - - try { - stmt = conn.prepareStatement( - String.format(UPSERT_AGGREGATE_RECORD_SQL, phoenixTableName)); - - for (Map.Entry<TimelineMetric, MetricHostAggregate> metricAggregate : - hostAggregateMap.entrySet()) { - - TimelineMetric metric = metricAggregate.getKey(); - MetricHostAggregate hostAggregate = metricAggregate.getValue(); - - rowCount++; - stmt.clearParameters(); - stmt.setString(1, metric.getMetricName()); - stmt.setString(2, metric.getHostName()); - stmt.setString(3, metric.getAppId()); - stmt.setString(4, metric.getInstanceId()); - stmt.setLong(5, metric.getTimestamp()); - stmt.setString(6, metric.getType()); - stmt.setDouble(7, hostAggregate.getSum()); - stmt.setDouble(8, hostAggregate.getMax()); - stmt.setDouble(9, hostAggregate.getMin()); - stmt.setDouble(10, hostAggregate.getNumberOfSamples()); - - try { - // TODO: Why this exception is swallowed - stmt.executeUpdate(); - } catch (SQLException sql) { - LOG.error(sql); - } - - if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) { - conn.commit(); - rowCount = 0; - } - - } - - conn.commit(); - - } finally { - if (stmt != null) { - try { - stmt.close(); - } catch (SQLException e) { - // Ignore - } - } - if (conn != null) { - try { - conn.close(); - } catch (SQLException sql) { - // Ignore - } - } - } - - long end = System.currentTimeMillis(); - - if ((end - start) > 60000l) { - LOG.info("Time to save map: " + (end - start) + ", " + - "thread = " + Thread.currentThread().getClass()); - } - } - } - - /** - * Save Metric aggregate records. - * - * @throws SQLException - */ - public void saveClusterAggregateRecords( - Map<TimelineClusterMetric, MetricClusterAggregate> records) - throws SQLException { - - if (records == null || records.isEmpty()) { - LOG.debug("Empty aggregate records."); - return; - } - - long start = System.currentTimeMillis(); - - Connection conn = getConnection(); - PreparedStatement stmt = null; - try { - stmt = conn.prepareStatement(UPSERT_CLUSTER_AGGREGATE_SQL); - int rowCount = 0; - - for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate> - aggregateEntry : records.entrySet()) { - TimelineClusterMetric clusterMetric = aggregateEntry.getKey(); - MetricClusterAggregate aggregate = aggregateEntry.getValue(); - - LOG.trace("clusterMetric = " + clusterMetric + ", " + - "aggregate = " + aggregate); - - rowCount++; - stmt.clearParameters(); - stmt.setString(1, clusterMetric.getMetricName()); - stmt.setString(2, clusterMetric.getAppId()); - stmt.setString(3, clusterMetric.getInstanceId()); - stmt.setLong(4, clusterMetric.getTimestamp()); - stmt.setString(5, clusterMetric.getType()); - stmt.setDouble(6, aggregate.getSum()); - stmt.setInt(7, aggregate.getNumberOfHosts()); - stmt.setDouble(8, aggregate.getMax()); - stmt.setDouble(9, aggregate.getMin()); - - try { - stmt.executeUpdate(); - } catch (SQLException sql) { - // TODO: Why this exception is swallowed - LOG.error(sql); - } - - if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) { - conn.commit(); - rowCount = 0; - } - } - - conn.commit(); - - } finally { - if (stmt != null) { - try { - stmt.close(); - } catch (SQLException e) { - // Ignore - } - } - if (conn != null) { - try { - conn.close(); - } catch (SQLException sql) { - // Ignore - } - } - } - long end = System.currentTimeMillis(); - if ((end - start) > 60000l) { - LOG.info("Time to save: " + (end - start) + ", " + - "thread = " + Thread.currentThread().getName()); - } - } - - /** - * Save Metric aggregate records. - * - * @throws SQLException - */ - public void saveClusterAggregateHourlyRecords( - Map<TimelineClusterMetric, MetricHostAggregate> records, - String tableName) - throws SQLException { - if (records == null || records.isEmpty()) { - LOG.debug("Empty aggregate records."); - return; - } - - long start = System.currentTimeMillis(); - - Connection conn = getConnection(); - PreparedStatement stmt = null; - try { - stmt = conn.prepareStatement(String.format - (UPSERT_CLUSTER_AGGREGATE_TIME_SQL, tableName)); - int rowCount = 0; - - for (Map.Entry<TimelineClusterMetric, MetricHostAggregate> - aggregateEntry : records.entrySet()) { - TimelineClusterMetric clusterMetric = aggregateEntry.getKey(); - MetricHostAggregate aggregate = aggregateEntry.getValue(); - - LOG.trace("clusterMetric = " + clusterMetric + ", " + - "aggregate = " + aggregate); - - rowCount++; - stmt.clearParameters(); - stmt.setString(1, clusterMetric.getMetricName()); - stmt.setString(2, clusterMetric.getAppId()); - stmt.setString(3, clusterMetric.getInstanceId()); - stmt.setLong(4, clusterMetric.getTimestamp()); - stmt.setString(5, clusterMetric.getType()); - stmt.setDouble(6, aggregate.getSum()); -// stmt.setInt(7, aggregate.getNumberOfHosts()); - stmt.setLong(7, aggregate.getNumberOfSamples()); - stmt.setDouble(8, aggregate.getMax()); - stmt.setDouble(9, aggregate.getMin()); - - try { - stmt.executeUpdate(); - } catch (SQLException sql) { - // we have no way to verify it works!!! - LOG.error(sql); - } - - if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) { - conn.commit(); - rowCount = 0; - } - } - - conn.commit(); - - } finally { - if (stmt != null) { - try { - stmt.close(); - } catch (SQLException e) { - // Ignore - } - } - if (conn != null) { - try { - conn.close(); - } catch (SQLException sql) { - // Ignore - } - } - } - long end = System.currentTimeMillis(); - if ((end - start) > 60000l) { - LOG.info("Time to save: " + (end - start) + ", " + - "thread = " + Thread.currentThread().getName()); - } - } - - - public TimelineMetrics getAggregateMetricRecords(final Condition condition) - throws SQLException { - - if (condition.isEmpty()) { - throw new SQLException("No filter criteria specified."); - } - - Connection conn = getConnection(); - PreparedStatement stmt = null; - TimelineMetrics metrics = new TimelineMetrics(); - - try { - stmt = PhoenixTransactSQL.prepareGetAggregateSqlStmt(conn, condition); - - ResultSet rs = stmt.executeQuery(); - - while (rs.next()) { - TimelineMetric metric = new TimelineMetric(); - metric.setMetricName(rs.getString("METRIC_NAME")); - metric.setAppId(rs.getString("APP_ID")); - metric.setInstanceId(rs.getString("INSTANCE_ID")); - metric.setTimestamp(rs.getLong("SERVER_TIME")); - metric.setStartTime(rs.getLong("SERVER_TIME")); - Map<Long, Double> valueMap = new HashMap<Long, Double>(); - valueMap.put(rs.getLong("SERVER_TIME"), - rs.getDouble("METRIC_SUM") / rs.getInt("HOSTS_COUNT")); - metric.setMetricValues(valueMap); - - if (condition.isGrouped()) { - metrics.addOrMergeTimelineMetric(metric); - } else { - metrics.getMetrics().add(metric); - } - } - - } finally { - if (stmt != null) { - try { - stmt.close(); - } catch (SQLException e) { - // Ignore - } - } - if (conn != null) { - try { - conn.close(); - } catch (SQLException sql) { - // Ignore - } - } - } - LOG.info("Aggregate records size: " + metrics.getMetrics().size()); - return metrics; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java deleted file mode 100644 index 0d53f5f..0000000 --- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java +++ /dev/null @@ -1,528 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Set; - -/** - * Encapsulate all metrics related SQL queries. - */ -public class PhoenixTransactSQL { - - static final Log LOG = LogFactory.getLog(PhoenixTransactSQL.class); - // TODO: Configurable TTL values - /** - * Create table to store individual metric records. - */ - public static final String CREATE_METRICS_TABLE_SQL = "CREATE TABLE IF NOT " + - "EXISTS METRIC_RECORD (METRIC_NAME VARCHAR, " + - "HOSTNAME VARCHAR, " + - "SERVER_TIME UNSIGNED_LONG NOT NULL, " + - "APP_ID VARCHAR, " + - "INSTANCE_ID VARCHAR, " + - "START_TIME UNSIGNED_LONG, " + - "UNITS CHAR(20), " + - "METRIC_SUM DOUBLE, " + - "METRIC_COUNT UNSIGNED_INT, " + - "METRIC_MAX DOUBLE, " + - "METRIC_MIN DOUBLE, " + - "METRICS VARCHAR CONSTRAINT pk " + - "PRIMARY KEY (METRIC_NAME, HOSTNAME, SERVER_TIME, APP_ID, " + - "INSTANCE_ID)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " + - "TTL=%s, COMPRESSION='%s'"; - - public static final String CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL = - "CREATE TABLE IF NOT EXISTS METRIC_RECORD_HOURLY " + - "(METRIC_NAME VARCHAR, " + - "HOSTNAME VARCHAR, " + - "APP_ID VARCHAR, " + - "INSTANCE_ID VARCHAR, " + - "SERVER_TIME UNSIGNED_LONG NOT NULL, " + - "UNITS CHAR(20), " + - "METRIC_SUM DOUBLE," + - "METRIC_COUNT UNSIGNED_INT, " + - "METRIC_MAX DOUBLE," + - "METRIC_MIN DOUBLE CONSTRAINT pk " + - "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " + - "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " + - "TTL=%s, COMPRESSION='%s'"; - - public static final String CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL = - "CREATE TABLE IF NOT EXISTS METRIC_RECORD_MINUTE " + - "(METRIC_NAME VARCHAR, " + - "HOSTNAME VARCHAR, " + - "APP_ID VARCHAR, " + - "INSTANCE_ID VARCHAR, " + - "SERVER_TIME UNSIGNED_LONG NOT NULL, " + - "UNITS CHAR(20), " + - "METRIC_SUM DOUBLE," + - "METRIC_COUNT UNSIGNED_INT, " + - "METRIC_MAX DOUBLE," + - "METRIC_MIN DOUBLE CONSTRAINT pk " + - "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " + - "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, TTL=%s," + - " COMPRESSION='%s'"; - - public static final String CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL = - "CREATE TABLE IF NOT EXISTS METRIC_AGGREGATE " + - "(METRIC_NAME VARCHAR, " + - "APP_ID VARCHAR, " + - "INSTANCE_ID VARCHAR, " + - "SERVER_TIME UNSIGNED_LONG NOT NULL, " + - "UNITS CHAR(20), " + - "METRIC_SUM DOUBLE, " + - "HOSTS_COUNT UNSIGNED_INT, " + - "METRIC_MAX DOUBLE, " + - "METRIC_MIN DOUBLE " + - "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " + - "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " + - "TTL=%s, COMPRESSION='%s'"; - - public static final String CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL = - "CREATE TABLE IF NOT EXISTS METRIC_AGGREGATE_HOURLY " + - "(METRIC_NAME VARCHAR, " + - "APP_ID VARCHAR, " + - "INSTANCE_ID VARCHAR, " + - "SERVER_TIME UNSIGNED_LONG NOT NULL, " + - "UNITS CHAR(20), " + - "METRIC_SUM DOUBLE, " + - "METRIC_COUNT UNSIGNED_INT, " + - "METRIC_MAX DOUBLE, " + - "METRIC_MIN DOUBLE " + - "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " + - "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " + - "TTL=%s, COMPRESSION='%s'"; - - /** - * Insert into metric records table. - */ - public static final String UPSERT_METRICS_SQL = "UPSERT INTO %s " + - "(METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, START_TIME, " + - "UNITS, " + - "METRIC_SUM, " + - "METRIC_MAX, " + - "METRIC_MIN, " + - "METRIC_COUNT, " + - "METRICS) VALUES " + - "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; - - public static final String UPSERT_CLUSTER_AGGREGATE_SQL = "UPSERT INTO " + - "METRIC_AGGREGATE (METRIC_NAME, APP_ID, INSTANCE_ID, SERVER_TIME, " + - "UNITS, " + - "METRIC_SUM, " + - "HOSTS_COUNT, " + - "METRIC_MAX, " + - "METRIC_MIN) " + - "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; - - public static final String UPSERT_CLUSTER_AGGREGATE_TIME_SQL = "UPSERT INTO" + - " %s (METRIC_NAME, APP_ID, INSTANCE_ID, SERVER_TIME, " + - "UNITS, " + - "METRIC_SUM, " + - "METRIC_COUNT, " + - "METRIC_MAX, " + - "METRIC_MIN) " + - "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; - - - public static final String UPSERT_AGGREGATE_RECORD_SQL = "UPSERT INTO " + - "%s (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " + - "SERVER_TIME, " + - "UNITS, " + - "METRIC_SUM, " + - "METRIC_MAX, " + - "METRIC_MIN," + - "METRIC_COUNT) " + - "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; - - /** - * Retrieve a set of rows from metrics records table. - */ - public static final String GET_METRIC_SQL = "SELECT %s METRIC_NAME, " + - "HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, START_TIME, UNITS, " + - "METRIC_SUM, " + - "METRIC_MAX, " + - "METRIC_MIN, " + - "METRIC_COUNT, " + - "METRICS " + - "FROM %s"; - - public static final String GET_METRIC_AGGREGATE_ONLY_SQL = "SELECT %s " + - "METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, " + - "UNITS, " + - "METRIC_SUM, " + - "METRIC_MAX, " + - "METRIC_MIN, " + - "METRIC_COUNT " + - "FROM %s"; - - public static final String GET_CLUSTER_AGGREGATE_SQL = "SELECT %s " + - "METRIC_NAME, APP_ID, " + - "INSTANCE_ID, SERVER_TIME, " + - "UNITS, " + - "METRIC_SUM, " + - "HOSTS_COUNT, " + - "METRIC_MAX, " + - "METRIC_MIN " + - "FROM METRIC_AGGREGATE"; - - public static final String METRICS_RECORD_TABLE_NAME = "METRIC_RECORD"; - public static final String METRICS_AGGREGATE_MINUTE_TABLE_NAME = - "METRIC_RECORD_MINUTE"; - public static final String METRICS_AGGREGATE_HOURLY_TABLE_NAME = - "METRIC_RECORD_HOURLY"; - public static final String METRICS_CLUSTER_AGGREGATE_TABLE_NAME = - "METRIC_AGGREGATE"; - public static final String METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME = - "METRIC_AGGREGATE_HOURLY"; - public static final String DEFAULT_TABLE_COMPRESSION = "SNAPPY"; - public static final String DEFAULT_ENCODING = "FAST_DIFF"; - public static final long NATIVE_TIME_RANGE_DELTA = 120000; // 2 minutes - - /** Filter to optimize HBase scan by using file timestamps. This prevents - * a full table scan of metric records. - * @return Phoenix Hint String - */ - public static String getNaiveTimeRangeHint(Long startTime, Long delta) { - return String.format("/*+ NATIVE_TIME_RANGE(%s) */", (startTime - delta)); - } - - public static PreparedStatement prepareGetMetricsSqlStmt( - Connection connection, Condition condition) throws SQLException { - - if (condition.isEmpty()) { - throw new IllegalArgumentException("Condition is empty."); - } - String stmtStr; - if (condition.getStatement() != null) { - stmtStr = condition.getStatement(); - } else { - stmtStr = String.format(GET_METRIC_SQL, - getNaiveTimeRangeHint(condition.getStartTime(), NATIVE_TIME_RANGE_DELTA), - METRICS_RECORD_TABLE_NAME); - } - - StringBuilder sb = new StringBuilder(stmtStr); - sb.append(" WHERE "); - sb.append(condition.getConditionClause()); - String orderByClause = condition.getOrderByClause(); - - if (orderByClause != null) { - sb.append(orderByClause); - } else { - sb.append(" ORDER BY METRIC_NAME, SERVER_TIME "); - } - if (condition.getLimit() != null) { - sb.append(" LIMIT ").append(condition.getLimit()); - } - - LOG.debug("SQL: " + sb.toString() + ", condition: " + condition); - PreparedStatement stmt = connection.prepareStatement(sb.toString()); - int pos = 1; - if (condition.getMetricNames() != null) { - for (; pos <= condition.getMetricNames().size(); pos++) { - LOG.debug("Setting pos: " + pos + ", value = " + condition.getMetricNames().get(pos - 1)); - stmt.setString(pos, condition.getMetricNames().get(pos - 1)); - } - } - if (condition.getHostname() != null) { - LOG.debug("Setting pos: " + pos + ", value: " + condition.getHostname()); - stmt.setString(pos++, condition.getHostname()); - } - // TODO: Upper case all strings on POST - if (condition.getAppId() != null) { - // TODO: fix case of appId coming from host metrics - String appId = condition.getAppId(); - if (!condition.getAppId().equals("HOST")) { - appId = appId.toLowerCase(); - } - LOG.debug("Setting pos: " + pos + ", value: " + appId); - stmt.setString(pos++, appId); - } - if (condition.getInstanceId() != null) { - LOG.debug("Setting pos: " + pos + ", value: " + condition.getInstanceId()); - stmt.setString(pos++, condition.getInstanceId()); - } - if (condition.getStartTime() != null) { - LOG.debug("Setting pos: " + pos + ", value: " + condition.getStartTime()); - stmt.setLong(pos++, condition.getStartTime()); - } - if (condition.getEndTime() != null) { - LOG.debug("Setting pos: " + pos + ", value: " + condition.getEndTime()); - stmt.setLong(pos, condition.getEndTime()); - } - if (condition.getFetchSize() != null) { - stmt.setFetchSize(condition.getFetchSize()); - } - - return stmt; - } - - - public static PreparedStatement prepareGetAggregateSqlStmt( - Connection connection, Condition condition) throws SQLException { - - if (condition.isEmpty()) { - throw new IllegalArgumentException("Condition is empty."); - } - - StringBuilder sb = new StringBuilder(GET_CLUSTER_AGGREGATE_SQL); - sb.append(" WHERE "); - sb.append(condition.getConditionClause()); - sb.append(" ORDER BY METRIC_NAME, SERVER_TIME"); - if (condition.getLimit() != null) { - sb.append(" LIMIT ").append(condition.getLimit()); - } - - LOG.debug("SQL => " + sb.toString() + ", condition => " + condition); - PreparedStatement stmt = connection.prepareStatement(sb.toString()); - int pos = 1; - if (condition.getMetricNames() != null) { - for (; pos <= condition.getMetricNames().size(); pos++) { - stmt.setString(pos, condition.getMetricNames().get(pos - 1)); - } - } - // TODO: Upper case all strings on POST - if (condition.getAppId() != null) { - stmt.setString(pos++, condition.getAppId().toLowerCase()); - } - if (condition.getInstanceId() != null) { - stmt.setString(pos++, condition.getInstanceId()); - } - if (condition.getStartTime() != null) { - stmt.setLong(pos++, condition.getStartTime()); - } - if (condition.getEndTime() != null) { - stmt.setLong(pos, condition.getEndTime()); - } - - return stmt; - } - - static class Condition { - List<String> metricNames; - String hostname; - String appId; - String instanceId; - Long startTime; - Long endTime; - Integer limit; - boolean grouped; - boolean noLimit = false; - Integer fetchSize; - String statement; - Set<String> orderByColumns = new LinkedHashSet<String>(); - - Condition(List<String> metricNames, String hostname, String appId, - String instanceId, Long startTime, Long endTime, Integer limit, - boolean grouped) { - this.metricNames = metricNames; - this.hostname = hostname; - this.appId = appId; - this.instanceId = instanceId; - this.startTime = startTime; - this.endTime = endTime; - this.limit = limit; - this.grouped = grouped; - } - - String getStatement() { - return statement; - } - - void setStatement(String statement) { - this.statement = statement; - } - - List<String> getMetricNames() { - return metricNames == null || metricNames.isEmpty() ? null : metricNames; - } - - String getMetricsClause() { - StringBuilder sb = new StringBuilder("("); - if (metricNames != null) { - for (String name : metricNames) { - if (sb.length() != 1) { - sb.append(", "); - } - sb.append("?"); - } - sb.append(")"); - return sb.toString(); - } else { - return null; - } - } - - String getConditionClause() { - StringBuilder sb = new StringBuilder(); - boolean appendConjunction = false; - - if (getMetricNames() != null) { - sb.append("METRIC_NAME IN "); - sb.append(getMetricsClause()); - appendConjunction = true; - } - if (appendConjunction) { - sb.append(" AND"); - } - appendConjunction = false; - if (getHostname() != null) { - sb.append(" HOSTNAME = ?"); - appendConjunction = true; - } - if (appendConjunction) { - sb.append(" AND"); - } - appendConjunction = false; - if (getAppId() != null) { - sb.append(" APP_ID = ?"); - appendConjunction = true; - } - if (appendConjunction) { - sb.append(" AND"); - } - appendConjunction = false; - if (getInstanceId() != null) { - sb.append(" INSTANCE_ID = ?"); - appendConjunction = true; - } - if (appendConjunction) { - sb.append(" AND"); - } - appendConjunction = false; - if (getStartTime() != null) { - sb.append(" SERVER_TIME >= ?"); - appendConjunction = true; - } - if (appendConjunction) { - sb.append(" AND"); - } - if (getEndTime() != null) { - sb.append(" SERVER_TIME < ?"); - } - return sb.toString(); - } - - String getHostname() { - return hostname == null || hostname.isEmpty() ? null : hostname; - } - - String getAppId() { - return appId == null || appId.isEmpty() ? null : appId; - } - - String getInstanceId() { - return instanceId == null || instanceId.isEmpty() ? null : instanceId; - } - - /** - * Convert to millis. - */ - Long getStartTime() { - if (startTime < 9999999999l) { - return startTime * 1000; - } else { - return startTime; - } - } - - Long getEndTime() { - if (endTime < 9999999999l) { - return endTime * 1000; - } else { - return endTime; - } - } - - void setNoLimit() { - this.noLimit = true; - } - - Integer getLimit() { - if (noLimit) { - return null; - } - return limit == null ? PhoenixHBaseAccessor.RESULTSET_LIMIT : limit; - } - - boolean isGrouped() { - return grouped; - } - - boolean isEmpty() { - return (metricNames == null || metricNames.isEmpty()) - && (hostname == null || hostname.isEmpty()) - && (appId == null || appId.isEmpty()) - && (instanceId == null || instanceId.isEmpty()) - && startTime == null - && endTime == null; - } - - Integer getFetchSize() { - return fetchSize; - } - - void setFetchSize(Integer fetchSize) { - this.fetchSize = fetchSize; - } - - void addOrderByColumn(String column) { - orderByColumns.add(column); - } - - String getOrderByClause() { - String orderByStr = " ORDER BY "; - if (!orderByColumns.isEmpty()) { - StringBuilder sb = new StringBuilder(orderByStr); - for (String orderByColumn : orderByColumns) { - if (sb.length() != orderByStr.length()) { - sb.append(", "); - } - sb.append(orderByColumn); - } - sb.append(" "); - return sb.toString(); - } - return null; - } - - @Override - public String toString() { - return "Condition{" + - "metricNames=" + metricNames + - ", hostname='" + hostname + '\'' + - ", appId='" + appId + '\'' + - ", instanceId='" + instanceId + '\'' + - ", startTime=" + startTime + - ", endTime=" + endTime + - ", limit=" + limit + - ", grouped=" + grouped + - ", orderBy=" + orderByColumns + - ", noLimit=" + noLimit + - '}'; - } - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java deleted file mode 100644 index d227993..0000000 --- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java +++ /dev/null @@ -1,97 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; - -public class TimelineClusterMetric { - private String metricName; - private String appId; - private String instanceId; - private long timestamp; - private String type; - - TimelineClusterMetric(String metricName, String appId, String instanceId, - long timestamp, String type) { - this.metricName = metricName; - this.appId = appId; - this.instanceId = instanceId; - this.timestamp = timestamp; - this.type = type; - } - - String getMetricName() { - return metricName; - } - - String getAppId() { - return appId; - } - - String getInstanceId() { - return instanceId; - } - - long getTimestamp() { - return timestamp; - } - - String getType() { return type; } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - TimelineClusterMetric that = (TimelineClusterMetric) o; - - if (timestamp != that.timestamp) return false; - if (appId != null ? !appId.equals(that.appId) : that.appId != null) - return false; - if (instanceId != null ? !instanceId.equals(that.instanceId) : that.instanceId != null) - return false; - if (!metricName.equals(that.metricName)) return false; - - return true; - } - - public boolean equalsExceptTime(TimelineClusterMetric metric) { - if (!metricName.equals(metric.metricName)) return false; - if (!appId.equals(metric.appId)) return false; - if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null) - return false; - - return true; - } - @Override - public int hashCode() { - int result = metricName.hashCode(); - result = 31 * result + (appId != null ? appId.hashCode() : 0); - result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0); - result = 31 * result + (int) (timestamp ^ (timestamp >>> 32)); - return result; - } - - @Override - public String toString() { - return "TimelineClusterMetric{" + - "metricName='" + metricName + '\'' + - ", appId='" + appId + '\'' + - ", instanceId='" + instanceId + '\'' + - ", timestamp=" + timestamp + - '}'; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java deleted file mode 100644 index cab154b..0000000 --- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java +++ /dev/null @@ -1,144 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import java.io.IOException; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.HashMap; -import java.util.Map; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL; - -public class TimelineMetricAggregator extends AbstractTimelineAggregator { - private static final Log LOG = LogFactory.getLog - (TimelineMetricAggregator.class); - - private final String checkpointLocation; - private final Long sleepIntervalMillis; - private final Integer checkpointCutOffMultiplier; - private final String hostAggregatorDisabledParam; - private final String tableName; - private final String outputTableName; - private final Long nativeTimeRangeDelay; - - public TimelineMetricAggregator(PhoenixHBaseAccessor hBaseAccessor, - Configuration metricsConf, - String checkpointLocation, - Long sleepIntervalMillis, - Integer checkpointCutOffMultiplier, - String hostAggregatorDisabledParam, - String tableName, - String outputTableName, - Long nativeTimeRangeDelay) { - super(hBaseAccessor, metricsConf); - this.checkpointLocation = checkpointLocation; - this.sleepIntervalMillis = sleepIntervalMillis; - this.checkpointCutOffMultiplier = checkpointCutOffMultiplier; - this.hostAggregatorDisabledParam = hostAggregatorDisabledParam; - this.tableName = tableName; - this.outputTableName = outputTableName; - this.nativeTimeRangeDelay = nativeTimeRangeDelay; - } - - @Override - protected String getCheckpointLocation() { - return checkpointLocation; - } - - @Override - protected void aggregate(ResultSet rs, long startTime, long endTime) - throws IOException, SQLException { - Map<TimelineMetric, MetricHostAggregate> hostAggregateMap = - aggregateMetricsFromResultSet(rs); - - LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates."); - hBaseAccessor.saveHostAggregateRecords(hostAggregateMap, - outputTableName); - } - - @Override - protected Condition prepareMetricQueryCondition(long startTime, long endTime) { - Condition condition = new Condition(null, null, null, null, startTime, - endTime, null, true); - condition.setNoLimit(); - condition.setFetchSize(resultsetFetchSize); - condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL, - PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, nativeTimeRangeDelay), - tableName)); - condition.addOrderByColumn("METRIC_NAME"); - condition.addOrderByColumn("HOSTNAME"); - condition.addOrderByColumn("APP_ID"); - condition.addOrderByColumn("INSTANCE_ID"); - condition.addOrderByColumn("SERVER_TIME"); - return condition; - } - - private Map<TimelineMetric, MetricHostAggregate> aggregateMetricsFromResultSet - (ResultSet rs) throws IOException, SQLException { - TimelineMetric existingMetric = null; - MetricHostAggregate hostAggregate = null; - Map<TimelineMetric, MetricHostAggregate> hostAggregateMap = - new HashMap<TimelineMetric, MetricHostAggregate>(); - - while (rs.next()) { - TimelineMetric currentMetric = - PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs); - MetricHostAggregate currentHostAggregate = - PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs); - - if (existingMetric == null) { - // First row - existingMetric = currentMetric; - hostAggregate = new MetricHostAggregate(); - hostAggregateMap.put(currentMetric, hostAggregate); - } - - if (existingMetric.equalsExceptTime(currentMetric)) { - // Recalculate totals with current metric - hostAggregate.updateAggregates(currentHostAggregate); - } else { - // Switched over to a new metric - save existing - create new aggregate - hostAggregate = new MetricHostAggregate(); - hostAggregate.updateAggregates(currentHostAggregate); - hostAggregateMap.put(currentMetric, hostAggregate); - existingMetric = currentMetric; - } - } - return hostAggregateMap; - } - - @Override - protected Long getSleepIntervalMillis() { - return sleepIntervalMillis; - } - - @Override - protected Integer getCheckpointCutOffMultiplier() { - return checkpointCutOffMultiplier; - } - - @Override - protected boolean isDisabled() { - return metricsConf.getBoolean(hostAggregatorDisabledParam, false); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java deleted file mode 100644 index 8b10079..0000000 --- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java +++ /dev/null @@ -1,99 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; - -import static java.util.concurrent.TimeUnit.SECONDS; -import org.apache.commons.io.FilenameUtils; -import org.apache.hadoop.conf.Configuration; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_DISABLED; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_DISABLED; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR; - -/** - * - */ -public class TimelineMetricAggregatorFactory { - private static final String MINUTE_AGGREGATE_CHECKPOINT_FILE = - "timeline-metrics-host-aggregator-checkpoint"; - private static final String MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE = - "timeline-metrics-host-aggregator-hourly-checkpoint"; - - public static TimelineMetricAggregator createTimelineMetricAggregatorMinute - (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { - - String checkpointDir = metricsConf.get( - TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); - String checkpointLocation = FilenameUtils.concat(checkpointDir, - MINUTE_AGGREGATE_CHECKPOINT_FILE); - long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong - (HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300l)); // 5 mins - - int checkpointCutOffMultiplier = metricsConf.getInt - (HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 3); - String hostAggregatorDisabledParam = HOST_AGGREGATOR_MINUTE_DISABLED; - - String inputTableName = METRICS_RECORD_TABLE_NAME; - String outputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME; - - return new TimelineMetricAggregator(hBaseAccessor, metricsConf, - checkpointLocation, - sleepIntervalMillis, - checkpointCutOffMultiplier, - hostAggregatorDisabledParam, - inputTableName, - outputTableName, - 120000l); - } - - public static TimelineMetricAggregator createTimelineMetricAggregatorHourly - (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { - - String checkpointDir = metricsConf.get( - TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); - String checkpointLocation = FilenameUtils.concat(checkpointDir, - MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE); - long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong - (HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l)); - - int checkpointCutOffMultiplier = metricsConf.getInt - (HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2); - String hostAggregatorDisabledParam = HOST_AGGREGATOR_HOUR_DISABLED; - - String inputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME; - String outputTableName = METRICS_AGGREGATE_HOURLY_TABLE_NAME; - - return new TimelineMetricAggregator(hBaseAccessor, metricsConf, - checkpointLocation, - sleepIntervalMillis, - checkpointCutOffMultiplier, - hostAggregatorDisabledParam, - inputTableName, - outputTableName, - 3600000l); - } - - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java deleted file mode 100644 index 654c188..0000000 --- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java +++ /dev/null @@ -1,220 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; - - -import org.apache.commons.io.FilenameUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import java.io.IOException; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_DISABLED; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR; - -/** - * Aggregates a metric across all hosts in the cluster. Reads metrics from - * the precision table and saves into the aggregate. - */ -public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator { - private static final Log LOG = LogFactory.getLog(TimelineMetricClusterAggregator.class); - private static final String CLUSTER_AGGREGATOR_CHECKPOINT_FILE = - "timeline-metrics-cluster-aggregator-checkpoint"; - private final String checkpointLocation; - private final Long sleepIntervalMillis; - public final int timeSliceIntervalMillis; - private final Integer checkpointCutOffMultiplier; - - public TimelineMetricClusterAggregator(PhoenixHBaseAccessor hBaseAccessor, - Configuration metricsConf) { - super(hBaseAccessor, metricsConf); - - String checkpointDir = metricsConf.get( - TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); - - checkpointLocation = FilenameUtils.concat(checkpointDir, - CLUSTER_AGGREGATOR_CHECKPOINT_FILE); - - sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong - (CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 120l)); - timeSliceIntervalMillis = (int)SECONDS.toMillis(metricsConf.getInt - (CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 15)); - checkpointCutOffMultiplier = - metricsConf.getInt(CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 2); - } - - @Override - protected String getCheckpointLocation() { - return checkpointLocation; - } - - @Override - protected void aggregate(ResultSet rs, long startTime, long endTime) - throws SQLException, IOException { - List<Long[]> timeSlices = getTimeSlices(startTime, endTime); - Map<TimelineClusterMetric, MetricClusterAggregate> - aggregateClusterMetrics = aggregateMetricsFromResultSet(rs, timeSlices); - - LOG.info("Saving " + aggregateClusterMetrics.size() + " metric aggregates."); - hBaseAccessor.saveClusterAggregateRecords(aggregateClusterMetrics); - } - - @Override - protected Condition prepareMetricQueryCondition(long startTime, long endTime) { - Condition condition = new Condition(null, null, null, null, startTime, - endTime, null, true); - condition.setNoLimit(); - condition.setFetchSize(resultsetFetchSize); - condition.setStatement(String.format(GET_METRIC_SQL, - PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA), - METRICS_RECORD_TABLE_NAME)); - condition.addOrderByColumn("METRIC_NAME"); - condition.addOrderByColumn("APP_ID"); - condition.addOrderByColumn("INSTANCE_ID"); - condition.addOrderByColumn("SERVER_TIME"); - return condition; - } - - private List<Long[]> getTimeSlices(long startTime, long endTime) { - List<Long[]> timeSlices = new ArrayList<Long[]>(); - long sliceStartTime = startTime; - while (sliceStartTime < endTime) { - timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + timeSliceIntervalMillis}); - sliceStartTime += timeSliceIntervalMillis; - } - return timeSlices; - } - - private Map<TimelineClusterMetric, MetricClusterAggregate> - aggregateMetricsFromResultSet(ResultSet rs, List<Long[]> timeSlices) - throws SQLException, IOException { - Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics = - new HashMap<TimelineClusterMetric, MetricClusterAggregate>(); - // Create time slices - - while (rs.next()) { - TimelineMetric metric = - PhoenixHBaseAccessor.getTimelineMetricFromResultSet(rs); - - Map<TimelineClusterMetric, Double> clusterMetrics = - sliceFromTimelineMetric(metric, timeSlices); - - if (clusterMetrics != null && !clusterMetrics.isEmpty()) { - for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry : - clusterMetrics.entrySet()) { - TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey(); - MetricClusterAggregate aggregate = aggregateClusterMetrics.get(clusterMetric); - Double avgValue = clusterMetricEntry.getValue(); - - if (aggregate == null) { - aggregate = new MetricClusterAggregate(avgValue, 1, null, - avgValue, avgValue); - aggregateClusterMetrics.put(clusterMetric, aggregate); - } else { - aggregate.updateSum(avgValue); - aggregate.updateNumberOfHosts(1); - aggregate.updateMax(avgValue); - aggregate.updateMin(avgValue); - } - } - } - } - return aggregateClusterMetrics; - } - - @Override - protected Long getSleepIntervalMillis() { - return sleepIntervalMillis; - } - - @Override - protected Integer getCheckpointCutOffMultiplier() { - return checkpointCutOffMultiplier; - } - - @Override - protected boolean isDisabled() { - return metricsConf.getBoolean(CLUSTER_AGGREGATOR_MINUTE_DISABLED, false); - } - - private Map<TimelineClusterMetric, Double> sliceFromTimelineMetric( - TimelineMetric timelineMetric, List<Long[]> timeSlices) { - - if (timelineMetric.getMetricValues().isEmpty()) { - return null; - } - - Map<TimelineClusterMetric, Double> timelineClusterMetricMap = - new HashMap<TimelineClusterMetric, Double>(); - - for (Map.Entry<Long, Double> metric : timelineMetric.getMetricValues().entrySet()) { - // TODO: investigate null values - pre filter - if (metric.getValue() == null) { - continue; - } - Long timestamp = getSliceTimeForMetric(timeSlices, - Long.parseLong(metric.getKey().toString())); - if (timestamp != -1) { - // Metric is within desired time range - TimelineClusterMetric clusterMetric = new TimelineClusterMetric( - timelineMetric.getMetricName(), - timelineMetric.getAppId(), - timelineMetric.getInstanceId(), - timestamp, - timelineMetric.getType()); - if (!timelineClusterMetricMap.containsKey(clusterMetric)) { - timelineClusterMetricMap.put(clusterMetric, metric.getValue()); - } else { - Double oldValue = timelineClusterMetricMap.get(clusterMetric); - Double newValue = (oldValue + metric.getValue()) / 2; - timelineClusterMetricMap.put(clusterMetric, newValue); - } - } - } - - return timelineClusterMetricMap; - } - - /** - * Return beginning of the time slice into which the metric fits. - */ - private Long getSliceTimeForMetric(List<Long[]> timeSlices, Long timestamp) { - for (Long[] timeSlice : timeSlices) { - if (timestamp >= timeSlice[0] && timestamp < timeSlice[1]) { - return timeSlice[0]; - } - } - return -1l; - } - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java deleted file mode 100644 index 7764ea3..0000000 --- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java +++ /dev/null @@ -1,173 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics - .timeline; - -import org.apache.commons.io.FilenameUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import java.io.IOException; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.HashMap; -import java.util.Map; -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.getMetricClusterAggregateFromResultSet; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.getTimelineMetricClusterKeyFromResultSet; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_DISABLED; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR; - -public class TimelineMetricClusterAggregatorHourly extends - AbstractTimelineAggregator { - private static final Log LOG = LogFactory.getLog - (TimelineMetricClusterAggregatorHourly.class); - private static final String CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE = - "timeline-metrics-cluster-aggregator-hourly-checkpoint"; - private final String checkpointLocation; - private final long sleepIntervalMillis; - private final Integer checkpointCutOffMultiplier; - private long checkpointCutOffIntervalMillis; - private static final Long NATIVE_TIME_RANGE_DELTA = 3600000l; // 1 hour - - public TimelineMetricClusterAggregatorHourly( - PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { - super(hBaseAccessor, metricsConf); - - String checkpointDir = metricsConf.get( - TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); - - checkpointLocation = FilenameUtils.concat(checkpointDir, - CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE); - - sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong - (CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l)); - checkpointCutOffIntervalMillis = SECONDS.toMillis(metricsConf.getLong - (CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL, 7200l)); - checkpointCutOffMultiplier = metricsConf.getInt - (CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2); - } - - @Override - protected String getCheckpointLocation() { - return checkpointLocation; - } - - @Override - protected void aggregate(ResultSet rs, long startTime, long endTime) - throws SQLException, IOException { - Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap = - aggregateMetricsFromResultSet(rs); - - LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates."); - hBaseAccessor.saveClusterAggregateHourlyRecords(hostAggregateMap, - METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME); - } - - @Override - protected Condition prepareMetricQueryCondition(long startTime, - long endTime) { - Condition condition = new Condition(null, null, null, null, startTime, - endTime, null, true); - condition.setNoLimit(); - condition.setFetchSize(resultsetFetchSize); - condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL, - PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA))); - condition.addOrderByColumn("METRIC_NAME"); - condition.addOrderByColumn("APP_ID"); - condition.addOrderByColumn("INSTANCE_ID"); - condition.addOrderByColumn("SERVER_TIME"); - return condition; - } - - private Map<TimelineClusterMetric, MetricHostAggregate> - aggregateMetricsFromResultSet(ResultSet rs) throws IOException, SQLException { - - TimelineClusterMetric existingMetric = null; - MetricHostAggregate hostAggregate = null; - Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap = - new HashMap<TimelineClusterMetric, MetricHostAggregate>(); - - while (rs.next()) { - TimelineClusterMetric currentMetric = - getTimelineMetricClusterKeyFromResultSet(rs); - MetricClusterAggregate currentHostAggregate = - getMetricClusterAggregateFromResultSet(rs); - - if (existingMetric == null) { - // First row - existingMetric = currentMetric; - hostAggregate = new MetricHostAggregate(); - hostAggregateMap.put(currentMetric, hostAggregate); - } - - if (existingMetric.equalsExceptTime(currentMetric)) { - // Recalculate totals with current metric - updateAggregatesFromHost(hostAggregate, currentHostAggregate); - - } else { - // Switched over to a new metric - save existing - hostAggregate = new MetricHostAggregate(); - updateAggregatesFromHost(hostAggregate, currentHostAggregate); - hostAggregateMap.put(currentMetric, hostAggregate); - existingMetric = currentMetric; - } - - } - - return hostAggregateMap; - } - - private void updateAggregatesFromHost( - MetricHostAggregate agg, - MetricClusterAggregate currentClusterAggregate) { - agg.updateMax(currentClusterAggregate.getMax()); - agg.updateMin(currentClusterAggregate.getMin()); - agg.updateSum(currentClusterAggregate.getSum()); - agg.updateNumberOfSamples(currentClusterAggregate.getNumberOfHosts()); - } - - @Override - protected Long getSleepIntervalMillis() { - return sleepIntervalMillis; - } - - @Override - protected Integer getCheckpointCutOffMultiplier() { - return checkpointCutOffMultiplier; - } - - @Override - protected Long getCheckpointCutOffIntervalMillis() { - return checkpointCutOffIntervalMillis; - } - - @Override - protected boolean isDisabled() { - return metricsConf.getBoolean(CLUSTER_AGGREGATOR_HOUR_DISABLED, false); - } - - -}