http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetricReader.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetricReader.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetricReader.java deleted file mode 100644 index ea0913e..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetricReader.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; - -import java.sql.ResultSet; -import java.sql.SQLException; - -public class TimelineClusterMetricReader { - - private boolean ignoreInstance; - - public TimelineClusterMetricReader(boolean ignoreInstance) { - this.ignoreInstance = ignoreInstance; - } - - public TimelineClusterMetric fromResultSet(ResultSet rs) - throws SQLException { - - return new TimelineClusterMetric( - rs.getString("METRIC_NAME"), - rs.getString("APP_ID"), - ignoreInstance ? null : rs.getString("INSTANCE_ID"), - rs.getLong("SERVER_TIME"), - rs.getString("UNITS")); - } -} -
http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java deleted file mode 100644 index 459e612..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java +++ /dev/null @@ -1,145 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import java.io.IOException; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.HashMap; -import java.util.Map; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DefaultCondition; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL; - -public class TimelineMetricAggregator extends AbstractTimelineAggregator { - private static final Log LOG = LogFactory.getLog - (TimelineMetricAggregator.class); - - private final String checkpointLocation; - private final Long sleepIntervalMillis; - private final Integer checkpointCutOffMultiplier; - private final String hostAggregatorDisabledParam; - private final String tableName; - private final String outputTableName; - private final Long nativeTimeRangeDelay; - - public TimelineMetricAggregator(PhoenixHBaseAccessor hBaseAccessor, - Configuration metricsConf, - String checkpointLocation, - Long sleepIntervalMillis, - Integer checkpointCutOffMultiplier, - String hostAggregatorDisabledParam, - String tableName, - String outputTableName, - Long nativeTimeRangeDelay) { - super(hBaseAccessor, metricsConf); - this.checkpointLocation = checkpointLocation; - this.sleepIntervalMillis = sleepIntervalMillis; - this.checkpointCutOffMultiplier = checkpointCutOffMultiplier; - this.hostAggregatorDisabledParam = hostAggregatorDisabledParam; - this.tableName = tableName; - this.outputTableName = outputTableName; - this.nativeTimeRangeDelay = nativeTimeRangeDelay; - } - - @Override - protected String getCheckpointLocation() { - return checkpointLocation; - } - - @Override - protected void aggregate(ResultSet rs, long startTime, long endTime) - throws IOException, SQLException { - Map<TimelineMetric, MetricHostAggregate> hostAggregateMap = - aggregateMetricsFromResultSet(rs); - - LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates."); - hBaseAccessor.saveHostAggregateRecords(hostAggregateMap, - outputTableName); - } - - @Override - protected Condition prepareMetricQueryCondition(long startTime, long endTime) { - Condition condition = new DefaultCondition(null, null, null, null, startTime, - endTime, null, null, true); - condition.setNoLimit(); - condition.setFetchSize(resultsetFetchSize); - condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL, - PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, nativeTimeRangeDelay), - tableName)); - condition.addOrderByColumn("METRIC_NAME"); - condition.addOrderByColumn("HOSTNAME"); - condition.addOrderByColumn("APP_ID"); - condition.addOrderByColumn("INSTANCE_ID"); - condition.addOrderByColumn("SERVER_TIME"); - return condition; - } - - private Map<TimelineMetric, MetricHostAggregate> aggregateMetricsFromResultSet - (ResultSet rs) throws IOException, SQLException { - TimelineMetric existingMetric = null; - MetricHostAggregate hostAggregate = null; - Map<TimelineMetric, MetricHostAggregate> hostAggregateMap = - new HashMap<TimelineMetric, MetricHostAggregate>(); - - while (rs.next()) { - TimelineMetric currentMetric = - PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs); - MetricHostAggregate currentHostAggregate = - PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs); - - if (existingMetric == null) { - // First row - existingMetric = currentMetric; - hostAggregate = new MetricHostAggregate(); - hostAggregateMap.put(currentMetric, hostAggregate); - } - - if (existingMetric.equalsExceptTime(currentMetric)) { - // Recalculate totals with current metric - hostAggregate.updateAggregates(currentHostAggregate); - } else { - // Switched over to a new metric - save existing - create new aggregate - hostAggregate = new MetricHostAggregate(); - hostAggregate.updateAggregates(currentHostAggregate); - hostAggregateMap.put(currentMetric, hostAggregate); - existingMetric = currentMetric; - } - } - return hostAggregateMap; - } - - @Override - protected Long getSleepIntervalMillis() { - return sleepIntervalMillis; - } - - @Override - protected Integer getCheckpointCutOffMultiplier() { - return checkpointCutOffMultiplier; - } - - @Override - protected boolean isDisabled() { - return metricsConf.getBoolean(hostAggregatorDisabledParam, false); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java deleted file mode 100644 index 1dfd3e6..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java +++ /dev/null @@ -1,99 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; - -import org.apache.commons.io.FilenameUtils; -import org.apache.hadoop.conf.Configuration; -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_DISABLED; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_DISABLED; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR; - -/** - * - */ -public class TimelineMetricAggregatorFactory { - private static final String MINUTE_AGGREGATE_CHECKPOINT_FILE = - "timeline-metrics-host-aggregator-checkpoint"; - private static final String MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE = - "timeline-metrics-host-aggregator-hourly-checkpoint"; - - public static TimelineMetricAggregator createTimelineMetricAggregatorMinute - (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { - - String checkpointDir = metricsConf.get( - TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); - String checkpointLocation = FilenameUtils.concat(checkpointDir, - MINUTE_AGGREGATE_CHECKPOINT_FILE); - long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong - (HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300l)); // 5 mins - - int checkpointCutOffMultiplier = metricsConf.getInt - (HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 3); - String hostAggregatorDisabledParam = HOST_AGGREGATOR_MINUTE_DISABLED; - - String inputTableName = METRICS_RECORD_TABLE_NAME; - String outputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME; - - return new TimelineMetricAggregator(hBaseAccessor, metricsConf, - checkpointLocation, - sleepIntervalMillis, - checkpointCutOffMultiplier, - hostAggregatorDisabledParam, - inputTableName, - outputTableName, - 120000l); - } - - public static TimelineMetricAggregator createTimelineMetricAggregatorHourly - (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { - - String checkpointDir = metricsConf.get( - TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); - String checkpointLocation = FilenameUtils.concat(checkpointDir, - MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE); - long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong - (HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l)); - - int checkpointCutOffMultiplier = metricsConf.getInt - (HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2); - String hostAggregatorDisabledParam = HOST_AGGREGATOR_HOUR_DISABLED; - - String inputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME; - String outputTableName = METRICS_AGGREGATE_HOURLY_TABLE_NAME; - - return new TimelineMetricAggregator(hBaseAccessor, metricsConf, - checkpointLocation, - sleepIntervalMillis, - checkpointCutOffMultiplier, - hostAggregatorDisabledParam, - inputTableName, - outputTableName, - 3600000l); - } - - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java deleted file mode 100644 index f595d5e..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java +++ /dev/null @@ -1,223 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; - - -import org.apache.commons.io.FilenameUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import java.io.IOException; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DefaultCondition; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_DISABLED; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR; - -/** - * Aggregates a metric across all hosts in the cluster. Reads metrics from - * the precision table and saves into the aggregate. - */ -public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator { - private static final Log LOG = LogFactory.getLog(TimelineMetricClusterAggregator.class); - private static final String CLUSTER_AGGREGATOR_CHECKPOINT_FILE = - "timeline-metrics-cluster-aggregator-checkpoint"; - private final String checkpointLocation; - private final Long sleepIntervalMillis; - public final int timeSliceIntervalMillis; - private final Integer checkpointCutOffMultiplier; - private TimelineMetricReader timelineMetricReader = - new TimelineMetricReader(true); - - public TimelineMetricClusterAggregator(PhoenixHBaseAccessor hBaseAccessor, - Configuration metricsConf) { - super(hBaseAccessor, metricsConf); - - String checkpointDir = metricsConf.get( - TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); - - checkpointLocation = FilenameUtils.concat(checkpointDir, - CLUSTER_AGGREGATOR_CHECKPOINT_FILE); - - sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong - (CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 120l)); - timeSliceIntervalMillis = (int)SECONDS.toMillis(metricsConf.getInt - (CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 15)); - checkpointCutOffMultiplier = - metricsConf.getInt(CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 2); - } - - @Override - protected String getCheckpointLocation() { - return checkpointLocation; - } - - @Override - protected void aggregate(ResultSet rs, long startTime, long endTime) - throws SQLException, IOException { - List<Long[]> timeSlices = getTimeSlices(startTime, endTime); - Map<TimelineClusterMetric, MetricClusterAggregate> - aggregateClusterMetrics = aggregateMetricsFromResultSet(rs, timeSlices); - - LOG.info("Saving " + aggregateClusterMetrics.size() + " metric aggregates."); - hBaseAccessor.saveClusterAggregateRecords(aggregateClusterMetrics); - } - - @Override - protected Condition prepareMetricQueryCondition(long startTime, long endTime) { - Condition condition = new DefaultCondition(null, null, null, null, startTime, - endTime, null, null, true); - condition.setNoLimit(); - condition.setFetchSize(resultsetFetchSize); - condition.setStatement(String.format(GET_METRIC_SQL, - PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA), - METRICS_RECORD_TABLE_NAME)); - condition.addOrderByColumn("METRIC_NAME"); - condition.addOrderByColumn("APP_ID"); - condition.addOrderByColumn("INSTANCE_ID"); - condition.addOrderByColumn("SERVER_TIME"); - return condition; - } - - private List<Long[]> getTimeSlices(long startTime, long endTime) { - List<Long[]> timeSlices = new ArrayList<Long[]>(); - long sliceStartTime = startTime; - while (sliceStartTime < endTime) { - timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + timeSliceIntervalMillis}); - sliceStartTime += timeSliceIntervalMillis; - } - return timeSlices; - } - - private Map<TimelineClusterMetric, MetricClusterAggregate> - aggregateMetricsFromResultSet(ResultSet rs, List<Long[]> timeSlices) - throws SQLException, IOException { - Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics = - new HashMap<TimelineClusterMetric, MetricClusterAggregate>(); - // Create time slices - - while (rs.next()) { - TimelineMetric metric = - timelineMetricReader.getTimelineMetricFromResultSet(rs); - - Map<TimelineClusterMetric, Double> clusterMetrics = - sliceFromTimelineMetric(metric, timeSlices); - - if (clusterMetrics != null && !clusterMetrics.isEmpty()) { - for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry : - clusterMetrics.entrySet()) { - TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey(); - MetricClusterAggregate aggregate = aggregateClusterMetrics.get(clusterMetric); - Double avgValue = clusterMetricEntry.getValue(); - - if (aggregate == null) { - aggregate = new MetricClusterAggregate(avgValue, 1, null, - avgValue, avgValue); - aggregateClusterMetrics.put(clusterMetric, aggregate); - } else { - aggregate.updateSum(avgValue); - aggregate.updateNumberOfHosts(1); - aggregate.updateMax(avgValue); - aggregate.updateMin(avgValue); - } - } - } - } - return aggregateClusterMetrics; - } - - @Override - protected Long getSleepIntervalMillis() { - return sleepIntervalMillis; - } - - @Override - protected Integer getCheckpointCutOffMultiplier() { - return checkpointCutOffMultiplier; - } - - @Override - protected boolean isDisabled() { - return metricsConf.getBoolean(CLUSTER_AGGREGATOR_MINUTE_DISABLED, false); - } - - private Map<TimelineClusterMetric, Double> sliceFromTimelineMetric( - TimelineMetric timelineMetric, List<Long[]> timeSlices) { - - if (timelineMetric.getMetricValues().isEmpty()) { - return null; - } - - Map<TimelineClusterMetric, Double> timelineClusterMetricMap = - new HashMap<TimelineClusterMetric, Double>(); - - for (Map.Entry<Long, Double> metric : timelineMetric.getMetricValues().entrySet()) { - // TODO: investigate null values - pre filter - if (metric.getValue() == null) { - continue; - } - Long timestamp = getSliceTimeForMetric(timeSlices, - Long.parseLong(metric.getKey().toString())); - if (timestamp != -1) { - // Metric is within desired time range - TimelineClusterMetric clusterMetric = new TimelineClusterMetric( - timelineMetric.getMetricName(), - timelineMetric.getAppId(), - timelineMetric.getInstanceId(), - timestamp, - timelineMetric.getType()); - if (!timelineClusterMetricMap.containsKey(clusterMetric)) { - timelineClusterMetricMap.put(clusterMetric, metric.getValue()); - } else { - Double oldValue = timelineClusterMetricMap.get(clusterMetric); - Double newValue = (oldValue + metric.getValue()) / 2; - timelineClusterMetricMap.put(clusterMetric, newValue); - } - } - } - - return timelineClusterMetricMap; - } - - /** - * Return beginning of the time slice into which the metric fits. - */ - private Long getSliceTimeForMetric(List<Long[]> timeSlices, Long timestamp) { - for (Long[] timeSlice : timeSlices) { - if (timestamp >= timeSlice[0] && timestamp < timeSlice[1]) { - return timeSlice[0]; - } - } - return -1l; - } - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java deleted file mode 100644 index c76ec60..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java +++ /dev/null @@ -1,177 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics - .timeline; - -import org.apache.commons.io.FilenameUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import java.io.IOException; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.HashMap; -import java.util.Map; -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.getMetricClusterAggregateFromResultSet; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DefaultCondition; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_DISABLED; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR; - -public class TimelineMetricClusterAggregatorHourly extends - AbstractTimelineAggregator { - private static final Log LOG = LogFactory.getLog - (TimelineMetricClusterAggregatorHourly.class); - private static final String CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE = - "timeline-metrics-cluster-aggregator-hourly-checkpoint"; - private final String checkpointLocation; - private final long sleepIntervalMillis; - private final Integer checkpointCutOffMultiplier; - private long checkpointCutOffIntervalMillis; - private static final Long NATIVE_TIME_RANGE_DELTA = 3600000l; // 1 hour - private final TimelineClusterMetricReader timelineClusterMetricReader - = new TimelineClusterMetricReader(true); - - public TimelineMetricClusterAggregatorHourly( - PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { - super(hBaseAccessor, metricsConf); - - String checkpointDir = metricsConf.get( - TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); - - checkpointLocation = FilenameUtils.concat(checkpointDir, - CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE); - - sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong - (CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l)); - checkpointCutOffIntervalMillis = SECONDS.toMillis(metricsConf.getLong - (CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL, 7200l)); - checkpointCutOffMultiplier = metricsConf.getInt - (CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2); - } - - @Override - protected String getCheckpointLocation() { - return checkpointLocation; - } - - @Override - protected void aggregate(ResultSet rs, long startTime, long endTime) - throws SQLException, IOException { - Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap = - aggregateMetricsFromResultSet(rs); - - LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates."); - hBaseAccessor.saveClusterAggregateHourlyRecords(hostAggregateMap, - METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME); - } - - @Override - protected Condition prepareMetricQueryCondition(long startTime, - long endTime) { - Condition condition = new DefaultCondition(null, null, null, null, startTime, - endTime, null, null, true); - condition.setNoLimit(); - condition.setFetchSize(resultsetFetchSize); - condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL, - PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA), - METRICS_CLUSTER_AGGREGATE_TABLE_NAME)); - condition.addOrderByColumn("METRIC_NAME"); - condition.addOrderByColumn("APP_ID"); - condition.addOrderByColumn("INSTANCE_ID"); - condition.addOrderByColumn("SERVER_TIME"); - return condition; - } - - private Map<TimelineClusterMetric, MetricHostAggregate> - aggregateMetricsFromResultSet(ResultSet rs) throws IOException, SQLException { - - TimelineClusterMetric existingMetric = null; - MetricHostAggregate hostAggregate = null; - Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap = - new HashMap<TimelineClusterMetric, MetricHostAggregate>(); - - while (rs.next()) { - TimelineClusterMetric currentMetric = - timelineClusterMetricReader.fromResultSet(rs); - MetricClusterAggregate currentHostAggregate = - getMetricClusterAggregateFromResultSet(rs); - - if (existingMetric == null) { - // First row - existingMetric = currentMetric; - hostAggregate = new MetricHostAggregate(); - hostAggregateMap.put(currentMetric, hostAggregate); - } - - if (existingMetric.equalsExceptTime(currentMetric)) { - // Recalculate totals with current metric - updateAggregatesFromHost(hostAggregate, currentHostAggregate); - - } else { - // Switched over to a new metric - save existing - hostAggregate = new MetricHostAggregate(); - updateAggregatesFromHost(hostAggregate, currentHostAggregate); - hostAggregateMap.put(currentMetric, hostAggregate); - existingMetric = currentMetric; - } - - } - - return hostAggregateMap; - } - - private void updateAggregatesFromHost( - MetricHostAggregate agg, - MetricClusterAggregate currentClusterAggregate) { - agg.updateMax(currentClusterAggregate.getMax()); - agg.updateMin(currentClusterAggregate.getMin()); - agg.updateSum(currentClusterAggregate.getSum()); - agg.updateNumberOfSamples(currentClusterAggregate.getNumberOfHosts()); - } - - @Override - protected Long getSleepIntervalMillis() { - return sleepIntervalMillis; - } - - @Override - protected Integer getCheckpointCutOffMultiplier() { - return checkpointCutOffMultiplier; - } - - @Override - protected Long getCheckpointCutOffIntervalMillis() { - return checkpointCutOffIntervalMillis; - } - - @Override - protected boolean isDisabled() { - return metricsConf.getBoolean(CLUSTER_AGGREGATOR_HOUR_DISABLED, false); - } - - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java index 248894c..b72aa64 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -128,6 +129,11 @@ public class TimelineMetricConfiguration { public static final String TIMELINE_SERVICE_RPC_ADDRESS = "timeline.metrics.service.rpc.address"; + public static final String CLUSTER_AGGREGATOR_APP_IDS = + "timeline.metrics.service.cluster.aggregator.appIds"; + + public static final String HOST_APP_ID = "HOST"; + private Configuration hbaseConf; private Configuration metricsConf; private volatile boolean isInitialized = false; http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricReader.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricReader.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricReader.java deleted file mode 100644 index aa349f0..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricReader.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; - - -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; - -import java.io.IOException; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.Map; -import java.util.TreeMap; - -public class TimelineMetricReader { - - private boolean ignoreInstance = false; - - public TimelineMetricReader() {} - - public TimelineMetricReader(boolean ignoreInstance) { - this.ignoreInstance = ignoreInstance; - } - - public TimelineMetric getTimelineMetricFromResultSet(ResultSet rs) - throws SQLException, IOException { - TimelineMetric metric = getTimelineMetricCommonsFromResultSet(rs); - Map<Long, Double> sortedByTimeMetrics = new TreeMap<Long, Double>( - PhoenixHBaseAccessor.readMetricFromJSON(rs.getString("METRICS"))); - metric.setMetricValues(sortedByTimeMetrics); - return metric; - } - - /** - * Returns common part of timeline metrics record without the values. - */ - public TimelineMetric getTimelineMetricCommonsFromResultSet(ResultSet rs) - throws SQLException { - TimelineMetric metric = new TimelineMetric(); - metric.setMetricName(rs.getString("METRIC_NAME")); - metric.setAppId(rs.getString("APP_ID")); - if (!ignoreInstance) metric.setInstanceId(rs.getString("INSTANCE_ID")); - metric.setHostName(rs.getString("HOSTNAME")); - metric.setTimestamp(rs.getLong("SERVER_TIME")); - metric.setStartTime(rs.getLong("START_TIME")); - metric.setType(rs.getString("UNITS")); - return metric; - } - -} - http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java new file mode 100644 index 0000000..8dea46e --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java @@ -0,0 +1,270 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; +import java.io.File; +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Date; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATOR_CHECKPOINT_DELAY; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.RESULTSET_FETCH_SIZE; + +public abstract class AbstractTimelineAggregator implements Runnable { + protected final PhoenixHBaseAccessor hBaseAccessor; + private final Log LOG; + + private Clock clock; + protected final long checkpointDelayMillis; + protected final Integer resultsetFetchSize; + protected Configuration metricsConf; + + public AbstractTimelineAggregator(PhoenixHBaseAccessor hBaseAccessor, + Configuration metricsConf) { + this(hBaseAccessor, metricsConf, new SystemClock()); + } + + public AbstractTimelineAggregator(PhoenixHBaseAccessor hBaseAccessor, + Configuration metricsConf, Clock clk) { + this.hBaseAccessor = hBaseAccessor; + this.metricsConf = metricsConf; + this.checkpointDelayMillis = SECONDS.toMillis( + metricsConf.getInt(AGGREGATOR_CHECKPOINT_DELAY, 120)); + this.resultsetFetchSize = metricsConf.getInt(RESULTSET_FETCH_SIZE, 2000); + this.LOG = LogFactory.getLog(this.getClass()); + this.clock = clk; + } + + @Override + public void run() { + LOG.info("Started Timeline aggregator thread @ " + new Date()); + Long SLEEP_INTERVAL = getSleepIntervalMillis(); + + while (true) { + long sleepTime = runOnce(SLEEP_INTERVAL); + + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + LOG.info("Sleep interrupted, continuing with aggregation."); + } + } + } + + /** + * Access relaxed for tests + */ + public long runOnce(Long SLEEP_INTERVAL) { + long currentTime = clock.getTime(); + long lastCheckPointTime = readLastCheckpointSavingOnFirstRun(currentTime); + long sleepTime = SLEEP_INTERVAL; + + if (lastCheckPointTime != -1) { + LOG.info("Last check point time: " + lastCheckPointTime + ", lagBy: " + + ((clock.getTime() - lastCheckPointTime) / 1000) + + " seconds."); + + long startTime = clock.getTime(); + boolean success = doWork(lastCheckPointTime, + lastCheckPointTime + SLEEP_INTERVAL); + long executionTime = clock.getTime() - startTime; + long delta = SLEEP_INTERVAL - executionTime; + + if (delta > 0) { + // Sleep for (configured sleep - time to execute task) + sleepTime = delta; + } else { + // No sleep because last run took too long to execute + LOG.info("Aggregator execution took too long, " + + "cancelling sleep. executionTime = " + executionTime); + sleepTime = 1; + } + + LOG.debug("Aggregator sleep interval = " + sleepTime); + + if (success) { + try { + // Comment to bug fix: + // cannot just save lastCheckPointTime + SLEEP_INTERVAL, + // it has to be verified so it is not a time in the future + // checkpoint says what was aggregated, and there is no way + // the future metrics were aggregated! + saveCheckPoint(Math.min(currentTime, lastCheckPointTime + + SLEEP_INTERVAL)); + } catch (IOException io) { + LOG.warn("Error saving checkpoint, restarting aggregation at " + + "previous checkpoint."); + } + } + } + + return sleepTime; + } + + private long readLastCheckpointSavingOnFirstRun(long currentTime) { + long lastCheckPointTime = -1; + + try { + lastCheckPointTime = readCheckPoint(); + if (isLastCheckPointTooOld(lastCheckPointTime)) { + LOG.warn("Last Checkpoint is too old, discarding last checkpoint. " + + "lastCheckPointTime = " + lastCheckPointTime); + lastCheckPointTime = -1; + } + if (lastCheckPointTime == -1) { + // Assuming first run, save checkpoint and sleep. + // Set checkpoint to 2 minutes in the past to allow the + // agents/collectors to catch up + LOG.info("Saving checkpoint time on first run." + + (currentTime - checkpointDelayMillis)); + saveCheckPoint(currentTime - checkpointDelayMillis); + } + } catch (IOException io) { + LOG.warn("Unable to write last checkpoint time. Resuming sleep.", io); + } + return lastCheckPointTime; + } + + private boolean isLastCheckPointTooOld(long checkpoint) { + // first checkpoint is saved checkpointDelayMillis in the past, + // so here we also need to take it into account + return checkpoint != -1 && + ((clock.getTime() - checkpoint - checkpointDelayMillis) > + getCheckpointCutOffIntervalMillis()); + } + + protected long readCheckPoint() { + try { + File checkpoint = new File(getCheckpointLocation()); + if (checkpoint.exists()) { + String contents = FileUtils.readFileToString(checkpoint); + if (contents != null && !contents.isEmpty()) { + return Long.parseLong(contents); + } + } + } catch (IOException io) { + LOG.debug(io); + } + return -1; + } + + protected void saveCheckPoint(long checkpointTime) throws IOException { + File checkpoint = new File(getCheckpointLocation()); + if (!checkpoint.exists()) { + boolean done = checkpoint.createNewFile(); + if (!done) { + throw new IOException("Could not create checkpoint at location, " + + getCheckpointLocation()); + } + } + FileUtils.writeStringToFile(checkpoint, String.valueOf(checkpointTime)); + } + + /** + * Read metrics written during the time interval and save the sum and total + * in the aggregate table. + * + * @param startTime Sample start time + * @param endTime Sample end time + */ + public boolean doWork(long startTime, long endTime) { + LOG.info("Start aggregation cycle @ " + new Date() + ", " + + "startTime = " + new Date(startTime) + ", endTime = " + new Date(endTime)); + + boolean success = true; + Condition condition = prepareMetricQueryCondition(startTime, endTime); + + Connection conn = null; + PreparedStatement stmt = null; + ResultSet rs = null; + + try { + conn = hBaseAccessor.getConnection(); + // FLUME 2. aggregate and ignore the instance + stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); + + LOG.debug("Query issued @: " + new Date()); + rs = stmt.executeQuery(); + LOG.debug("Query returned @: " + new Date()); + + aggregate(rs, startTime, endTime); + LOG.info("End aggregation cycle @ " + new Date()); + + } catch (SQLException e) { + LOG.error("Exception during aggregating metrics.", e); + success = false; + } catch (IOException e) { + LOG.error("Exception during aggregating metrics.", e); + success = false; + } finally { + if (rs != null) { + try { + rs.close(); + } catch (SQLException e) { + // Ignore + } + } + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException e) { + // Ignore + } + } + if (conn != null) { + try { + conn.close(); + } catch (SQLException sql) { + // Ignore + } + } + } + + LOG.info("End aggregation cycle @ " + new Date()); + return success; + } + + protected abstract Condition prepareMetricQueryCondition(long startTime, long endTime); + + protected abstract void aggregate(ResultSet rs, long startTime, long endTime) + throws IOException, SQLException; + + protected abstract Long getSleepIntervalMillis(); + + protected abstract Integer getCheckpointCutOffMultiplier(); + + protected Long getCheckpointCutOffIntervalMillis() { + return getCheckpointCutOffMultiplier() * getSleepIntervalMillis(); + } + + public abstract boolean isDisabled(); + + protected abstract String getCheckpointLocation(); +} http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AggregatorUtils.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AggregatorUtils.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AggregatorUtils.java new file mode 100644 index 0000000..ce79b6f --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AggregatorUtils.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; + + +import java.util.Map; + +/** + * + */ +public class AggregatorUtils { + + public static double[] calculateAggregates(Map<Long, Double> metricValues) { + double[] values = new double[4]; + double max = Double.MIN_VALUE; + double min = Double.MAX_VALUE; + double sum = 0.0; + int metricCount = 0; + + if (metricValues != null && !metricValues.isEmpty()) { + for (Double value : metricValues.values()) { + // TODO: Some nulls in data - need to investigate null values from host + if (value != null) { + if (value > max) { + max = value; + } + if (value < min) { + min = value; + } + sum += value; + } + } + metricCount = metricValues.values().size(); + } + // BR: WHY ZERO is a good idea? + values[0] = sum; + values[1] = max != Double.MIN_VALUE ? max : 0.0; + values[2] = min != Double.MAX_VALUE ? min : 0.0; + values[3] = metricCount; + + return values; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/Function.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/Function.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/Function.java new file mode 100644 index 0000000..9d17fca --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/Function.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; + +/** + * Is used to determine metrics aggregate table. + * + * @see org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.TimelineWebServices#getTimelineMetric + * @see org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.TimelineWebServices#getTimelineMetrics + */ +public class Function { + public static Function DEFAULT_VALUE_FUNCTION = + new Function(ReadFunction.VALUE, null); + private static final String SUFFIX_SEPARATOR = "\\._"; + + private ReadFunction readFunction = ReadFunction.VALUE; + private PostProcessingFunction postProcessingFunction = null; + + public Function(){ + + } + + public Function(ReadFunction readFunction, + PostProcessingFunction ppFunction){ + if (readFunction!=null){ + this.readFunction = readFunction ; + } + this.postProcessingFunction = ppFunction; + } + + public static Function fromMetricName(String metricName){ + // gets postprocessing, and aggregation function + // ex. Metric._rate._avg + String[] parts = metricName.split(SUFFIX_SEPARATOR); + + ReadFunction readFunction = ReadFunction.VALUE; + PostProcessingFunction ppFunction = null; + + if (parts.length == 3) { + ppFunction = PostProcessingFunction.getFunction(parts[1]); + readFunction = ReadFunction.getFunction(parts[2]); + } else if (parts.length == 2) { + ppFunction = null; + readFunction = ReadFunction.getFunction(parts[1]); + } + + + return new Function(readFunction, ppFunction); + } + + public String getSuffix(){ + return (postProcessingFunction == null)? readFunction.getSuffix() : + postProcessingFunction.getSuffix() + readFunction.getSuffix(); + } + + public ReadFunction getReadFunction() { + return readFunction; + } + + @Override + public String toString() { + return "Function{" + + "readFunction=" + readFunction + + ", postProcessingFunction=" + postProcessingFunction + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof Function)) return false; + + Function function = (Function) o; + + return postProcessingFunction == function.postProcessingFunction + && readFunction == function.readFunction; + + } + + @Override + public int hashCode() { + int result = readFunction.hashCode(); + result = 31 * result + (postProcessingFunction != null ? + postProcessingFunction.hashCode() : 0); + return result; + } + + public enum PostProcessingFunction { + NONE(""), + RATE("._rate"); + + PostProcessingFunction(String suffix){ + this.suffix = suffix; + } + + private String suffix = ""; + + public String getSuffix(){ + return suffix; + } + + public static PostProcessingFunction getFunction(String functionName) throws + FunctionFormatException { + if (functionName == null) { + return NONE; + } + + try { + return PostProcessingFunction.valueOf(functionName.toUpperCase()); + } catch (IllegalArgumentException e) { + throw new FunctionFormatException("Function should be value, avg, min, " + + "max", e); + } + } + } + + public enum ReadFunction { + VALUE(""), + AVG("._avg"), + MIN("._min"), + MAX("._max"), + SUM("._sum"); + + private final String suffix; + + ReadFunction(String suffix){ + this.suffix = suffix; + } + + public String getSuffix() { + return suffix; + } + + public static ReadFunction getFunction(String functionName) throws + FunctionFormatException { + if (functionName == null) { + return VALUE; + } + try { + return ReadFunction.valueOf(functionName.toUpperCase()); + } catch (IllegalArgumentException e) { + throw new FunctionFormatException( + "Function should be value, avg, min, max. Got " + functionName, e); + } + } + } + + public static class FunctionFormatException extends IllegalArgumentException { + public FunctionFormatException(String message, Throwable cause) { + super(message, cause); + } + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricAggregate.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricAggregate.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricAggregate.java new file mode 100644 index 0000000..825ac25 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricAggregate.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; + + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.codehaus.jackson.annotate.JsonProperty; +import org.codehaus.jackson.annotate.JsonSubTypes; +import org.codehaus.jackson.map.ObjectMapper; + +import java.io.IOException; + +/** +* +*/ +@JsonSubTypes({@JsonSubTypes.Type(value = MetricClusterAggregate.class), + @JsonSubTypes.Type(value = MetricHostAggregate.class)}) +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class MetricAggregate { + private static final ObjectMapper mapper = new ObjectMapper(); + + protected Double sum = 0.0; + protected Double deviation; + protected Double max = Double.MIN_VALUE; + protected Double min = Double.MAX_VALUE; + + public MetricAggregate() { + } + + MetricAggregate(Double sum, Double deviation, Double max, + Double min) { + this.sum = sum; + this.deviation = deviation; + this.max = max; + this.min = min; + } + + public void updateSum(Double sum) { + this.sum += sum; + } + + public void updateMax(Double max) { + if (max > this.max) { + this.max = max; + } + } + + public void updateMin(Double min) { + if (min < this.min) { + this.min = min; + } + } + + @JsonProperty("sum") + public Double getSum() { + return sum; + } + + @JsonProperty("deviation") + public Double getDeviation() { + return deviation; + } + + @JsonProperty("max") + public Double getMax() { + return max; + } + + @JsonProperty("min") + public Double getMin() { + return min; + } + + public void setSum(Double sum) { + this.sum = sum; + } + + public void setDeviation(Double deviation) { + this.deviation = deviation; + } + + public void setMax(Double max) { + this.max = max; + } + + public void setMin(Double min) { + this.min = min; + } + + public String toJSON() throws IOException { + return mapper.writeValueAsString(this); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricClusterAggregate.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricClusterAggregate.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricClusterAggregate.java new file mode 100644 index 0000000..9c837b6 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricClusterAggregate.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; + + +import org.codehaus.jackson.annotate.JsonCreator; +import org.codehaus.jackson.annotate.JsonProperty; + +/** +* +*/ +public class MetricClusterAggregate extends MetricAggregate { + private int numberOfHosts; + + @JsonCreator + public MetricClusterAggregate() { + } + + public MetricClusterAggregate(Double sum, int numberOfHosts, Double deviation, + Double max, Double min) { + super(sum, deviation, max, min); + this.numberOfHosts = numberOfHosts; + } + + @JsonProperty("numberOfHosts") + public int getNumberOfHosts() { + return numberOfHosts; + } + + public void updateNumberOfHosts(int count) { + this.numberOfHosts += count; + } + + public void setNumberOfHosts(int numberOfHosts) { + this.numberOfHosts = numberOfHosts; + } + + /** + * Find and update min, max and avg for a minute + */ + public void updateAggregates(MetricClusterAggregate hostAggregate) { + updateMax(hostAggregate.getMax()); + updateMin(hostAggregate.getMin()); + updateSum(hostAggregate.getSum()); + updateNumberOfHosts(hostAggregate.getNumberOfHosts()); + } + + @Override + public String toString() { + return "MetricAggregate{" + + "sum=" + sum + + ", numberOfHosts=" + numberOfHosts + + ", deviation=" + deviation + + ", max=" + max + + ", min=" + min + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricHostAggregate.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricHostAggregate.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricHostAggregate.java new file mode 100644 index 0000000..340ec75 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricHostAggregate.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; + + +import org.codehaus.jackson.annotate.JsonCreator; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * Represents a collection of minute based aggregation of values for + * resolution greater than a minute. + */ +public class MetricHostAggregate extends MetricAggregate { + + private long numberOfSamples = 0; + + @JsonCreator + public MetricHostAggregate() { + super(0.0, 0.0, Double.MIN_VALUE, Double.MAX_VALUE); + } + + public MetricHostAggregate(Double sum, int numberOfSamples, + Double deviation, + Double max, Double min) { + super(sum, deviation, max, min); + this.numberOfSamples = numberOfSamples; + } + + @JsonProperty("numberOfSamples") + public long getNumberOfSamples() { + return numberOfSamples == 0 ? 1 : numberOfSamples; + } + + public void updateNumberOfSamples(long count) { + this.numberOfSamples += count; + } + + public void setNumberOfSamples(long numberOfSamples) { + this.numberOfSamples = numberOfSamples; + } + + public double getAvg() { + return sum / numberOfSamples; + } + + /** + * Find and update min, max and avg for a minute + */ + public void updateAggregates(MetricHostAggregate hostAggregate) { + updateMax(hostAggregate.getMax()); + updateMin(hostAggregate.getMin()); + updateSum(hostAggregate.getSum()); + updateNumberOfSamples(hostAggregate.getNumberOfSamples()); + } + + @Override + public String toString() { + return "MetricHostAggregate{" + + "sum=" + sum + + ", numberOfSamples=" + numberOfSamples + + ", deviation=" + deviation + + ", max=" + max + + ", min=" + min + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetric.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetric.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetric.java new file mode 100644 index 0000000..3c30a6f --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetric.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; + +public class TimelineClusterMetric { + private String metricName; + private String appId; + private String instanceId; + private long timestamp; + private String type; + + public TimelineClusterMetric(String metricName, String appId, String instanceId, + long timestamp, String type) { + this.metricName = metricName; + this.appId = appId; + this.instanceId = instanceId; + this.timestamp = timestamp; + this.type = type; + } + + public String getMetricName() { + return metricName; + } + + public String getAppId() { + return appId; + } + + public String getInstanceId() { + return instanceId; + } + + public long getTimestamp() { + return timestamp; + } + + public String getType() { return type; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TimelineClusterMetric that = (TimelineClusterMetric) o; + + if (timestamp != that.timestamp) return false; + if (appId != null ? !appId.equals(that.appId) : that.appId != null) + return false; + if (instanceId != null ? !instanceId.equals(that.instanceId) : that.instanceId != null) + return false; + if (!metricName.equals(that.metricName)) return false; + + return true; + } + + public boolean equalsExceptTime(TimelineClusterMetric metric) { + if (!metricName.equals(metric.metricName)) return false; + if (!appId.equals(metric.appId)) return false; + if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null) + return false; + + return true; + } + @Override + public int hashCode() { + int result = metricName.hashCode(); + result = 31 * result + (appId != null ? appId.hashCode() : 0); + result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0); + result = 31 * result + (int) (timestamp ^ (timestamp >>> 32)); + return result; + } + + @Override + public String toString() { + return "TimelineClusterMetric{" + + "metricName='" + metricName + '\'' + + ", appId='" + appId + '\'' + + ", instanceId='" + instanceId + '\'' + + ", timestamp=" + timestamp + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetricReader.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetricReader.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetricReader.java new file mode 100644 index 0000000..3df88d2 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetricReader.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; + +import java.sql.ResultSet; +import java.sql.SQLException; + +public class TimelineClusterMetricReader { + + private boolean ignoreInstance; + + public TimelineClusterMetricReader(boolean ignoreInstance) { + this.ignoreInstance = ignoreInstance; + } + + public TimelineClusterMetric fromResultSet(ResultSet rs) + throws SQLException { + + return new TimelineClusterMetric( + rs.getString("METRIC_NAME"), + rs.getString("APP_ID"), + ignoreInstance ? null : rs.getString("INSTANCE_ID"), + rs.getLong("SERVER_TIME"), + rs.getString("UNITS")); + } +} + http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java new file mode 100644 index 0000000..a2887ea --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; +import java.io.IOException; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL; + +public class TimelineMetricAggregator extends AbstractTimelineAggregator { + private static final Log LOG = LogFactory.getLog + (TimelineMetricAggregator.class); + + private final String checkpointLocation; + private final Long sleepIntervalMillis; + private final Integer checkpointCutOffMultiplier; + private final String hostAggregatorDisabledParam; + private final String tableName; + private final String outputTableName; + private final Long nativeTimeRangeDelay; + + public TimelineMetricAggregator(PhoenixHBaseAccessor hBaseAccessor, + Configuration metricsConf, + String checkpointLocation, + Long sleepIntervalMillis, + Integer checkpointCutOffMultiplier, + String hostAggregatorDisabledParam, + String tableName, + String outputTableName, + Long nativeTimeRangeDelay) { + super(hBaseAccessor, metricsConf); + this.checkpointLocation = checkpointLocation; + this.sleepIntervalMillis = sleepIntervalMillis; + this.checkpointCutOffMultiplier = checkpointCutOffMultiplier; + this.hostAggregatorDisabledParam = hostAggregatorDisabledParam; + this.tableName = tableName; + this.outputTableName = outputTableName; + this.nativeTimeRangeDelay = nativeTimeRangeDelay; + } + + @Override + protected String getCheckpointLocation() { + return checkpointLocation; + } + + @Override + protected void aggregate(ResultSet rs, long startTime, long endTime) + throws IOException, SQLException { + Map<TimelineMetric, MetricHostAggregate> hostAggregateMap = + aggregateMetricsFromResultSet(rs); + + LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates."); + hBaseAccessor.saveHostAggregateRecords(hostAggregateMap, + outputTableName); + } + + @Override + protected Condition prepareMetricQueryCondition(long startTime, long endTime) { + Condition condition = new DefaultCondition(null, null, null, null, startTime, + endTime, null, null, true); + condition.setNoLimit(); + condition.setFetchSize(resultsetFetchSize); + condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL, + PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, nativeTimeRangeDelay), + tableName)); + condition.addOrderByColumn("METRIC_NAME"); + condition.addOrderByColumn("HOSTNAME"); + condition.addOrderByColumn("APP_ID"); + condition.addOrderByColumn("INSTANCE_ID"); + condition.addOrderByColumn("SERVER_TIME"); + return condition; + } + + private Map<TimelineMetric, MetricHostAggregate> aggregateMetricsFromResultSet + (ResultSet rs) throws IOException, SQLException { + TimelineMetric existingMetric = null; + MetricHostAggregate hostAggregate = null; + Map<TimelineMetric, MetricHostAggregate> hostAggregateMap = + new HashMap<TimelineMetric, MetricHostAggregate>(); + + while (rs.next()) { + TimelineMetric currentMetric = + PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs); + MetricHostAggregate currentHostAggregate = + PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs); + + if (existingMetric == null) { + // First row + existingMetric = currentMetric; + hostAggregate = new MetricHostAggregate(); + hostAggregateMap.put(currentMetric, hostAggregate); + } + + if (existingMetric.equalsExceptTime(currentMetric)) { + // Recalculate totals with current metric + hostAggregate.updateAggregates(currentHostAggregate); + } else { + // Switched over to a new metric - save existing - create new aggregate + hostAggregate = new MetricHostAggregate(); + hostAggregate.updateAggregates(currentHostAggregate); + hostAggregateMap.put(currentMetric, hostAggregate); + existingMetric = currentMetric; + } + } + return hostAggregateMap; + } + + @Override + protected Long getSleepIntervalMillis() { + return sleepIntervalMillis; + } + + @Override + protected Integer getCheckpointCutOffMultiplier() { + return checkpointCutOffMultiplier; + } + + @Override + public boolean isDisabled() { + return metricsConf.getBoolean(hostAggregatorDisabledParam, false); + } +}