http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java new file mode 100644 index 0000000..a0e4e32 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; + +import org.apache.commons.io.FilenameUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_DISABLED; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_DISABLED; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR; + +public class TimelineMetricAggregatorFactory { + private static final String MINUTE_AGGREGATE_CHECKPOINT_FILE = + "timeline-metrics-host-aggregator-checkpoint"; + private static final String MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE = + "timeline-metrics-host-aggregator-hourly-checkpoint"; + + public static TimelineMetricAggregator createTimelineMetricAggregatorMinute + (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { + + String checkpointDir = metricsConf.get( + TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); + String checkpointLocation = FilenameUtils.concat(checkpointDir, + MINUTE_AGGREGATE_CHECKPOINT_FILE); + long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong + (HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300l)); // 5 mins + + int checkpointCutOffMultiplier = metricsConf.getInt + (HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 3); + String hostAggregatorDisabledParam = HOST_AGGREGATOR_MINUTE_DISABLED; + + String inputTableName = METRICS_RECORD_TABLE_NAME; + String outputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME; + + return new TimelineMetricAggregator(hBaseAccessor, metricsConf, + checkpointLocation, + sleepIntervalMillis, + checkpointCutOffMultiplier, + hostAggregatorDisabledParam, + inputTableName, + outputTableName, + 120000l); + } + + public static TimelineMetricAggregator createTimelineMetricAggregatorHourly + (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { + + String checkpointDir = metricsConf.get( + TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); + String checkpointLocation = FilenameUtils.concat(checkpointDir, + MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE); + long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong + (HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l)); + + int checkpointCutOffMultiplier = metricsConf.getInt + (HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2); + String hostAggregatorDisabledParam = HOST_AGGREGATOR_HOUR_DISABLED; + + String inputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME; + String outputTableName = METRICS_AGGREGATE_HOURLY_TABLE_NAME; + + return new TimelineMetricAggregator(hBaseAccessor, metricsConf, + checkpointLocation, + sleepIntervalMillis, + checkpointCutOffMultiplier, + hostAggregatorDisabledParam, + inputTableName, + outputTableName, + 3600000l); + } + + +}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java new file mode 100644 index 0000000..0c8ded2 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_APP_IDS; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_APP_ID; + +/** + * Aggregator responsible for providing app level host aggregates. This task + * is accomplished without doing a round trip to storage, rather + * TimelineMetricClusterAggregators are responsible for lifecycle of + * @TimelineMetricAppAggregator and provide the raw data to aggregate. + */ +public class TimelineMetricAppAggregator { + private static final Log LOG = LogFactory.getLog(TimelineMetricAppAggregator.class); + // Lookup to check candidacy of an app + private final List<String> appIdsToAggregate; + // Map to lookup apps on a host + private Map<String, List<String>> hostedAppsMap = new HashMap<String, List<String>>(); + + Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics; + + public TimelineMetricAppAggregator(Configuration metricsConf) { + appIdsToAggregate = getAppIdsForHostAggregation(metricsConf); + LOG.info("AppIds configured for aggregation: " + appIdsToAggregate); + } + + /** + * Lifecycle method to initialize aggregation cycle. + */ + public void init() { + LOG.debug("Initializing aggregation cycle."); + aggregateClusterMetrics = new HashMap<TimelineClusterMetric, MetricClusterAggregate>(); + } + + /** + * Lifecycle method to indicate end of aggregation cycle. + */ + public void cleanup() { + LOG.debug("Cleanup aggregated data."); + aggregateClusterMetrics = null; + } + + /** + * Useful for resetting apps that no-longer need aggregation without restart. + */ + public void destroy() { + LOG.debug("Cleanup aggregated data as well as in-memory state."); + aggregateClusterMetrics = null; + hostedAppsMap = new HashMap<String, List<String>>(); + } + + /** + * Calculate aggregates if the clusterMetric is a Host metric for recorded + * apps that are housed by this host. + * + * @param clusterMetric @TimelineClusterMetric Host / App metric + * @param hostname This is the hostname from which this clusterMetric originated. + * @param metricValue The metric value for this metric. + */ + public void processTimelineClusterMetric(TimelineClusterMetric clusterMetric, + String hostname, Double metricValue) { + + String appId = clusterMetric.getAppId(); + if (appId == null) { + return; // No real use case except tests + } + + // If metric is a host metric and host has apps on it + if (appId.equalsIgnoreCase(HOST_APP_ID)) { + // Candidate metric, update app aggregates + if (hostedAppsMap.containsKey(hostname)) { + updateAppAggregatesFromHostMetric(clusterMetric, hostname, metricValue); + } + } else { + // Build the hostedapps map if not a host metric + // Check app candidacy for host aggregation + if (appIdsToAggregate.contains(appId)) { + List<String> appIds = hostedAppsMap.get(hostname); + if (appIds == null) { + appIds = new ArrayList<String>(); + hostedAppsMap.put(hostname, appIds); + } + if (!appIds.contains(appId)) { + appIds.add(appId); + LOG.info("Adding appId to hosted apps: appId = " + + clusterMetric.getAppId() + ", hostname = " + hostname); + } + } + } + } + + /** + * Build a cluster app metric from a host metric + */ + private void updateAppAggregatesFromHostMetric(TimelineClusterMetric clusterMetric, + String hostname, Double metricValue) { + + if (aggregateClusterMetrics == null) { + LOG.error("Aggregation requested without init call."); + return; + } + + List<String> apps = hostedAppsMap.get(hostname); + for (String appId : apps) { + // Add a new cluster aggregate metric if none exists + TimelineClusterMetric appTimelineClusterMetric = + new TimelineClusterMetric(clusterMetric.getMetricName(), + appId, + clusterMetric.getInstanceId(), + clusterMetric.getTimestamp(), + clusterMetric.getType() + ); + + MetricClusterAggregate clusterAggregate = aggregateClusterMetrics.get(appTimelineClusterMetric); + + if (clusterAggregate == null) { + clusterAggregate = new MetricClusterAggregate(metricValue, 1, null, metricValue, metricValue); + aggregateClusterMetrics.put(appTimelineClusterMetric, clusterAggregate); + } else { + clusterAggregate.updateSum(metricValue); + clusterAggregate.updateNumberOfHosts(1); + clusterAggregate.updateMax(metricValue); + clusterAggregate.updateMin(metricValue); + } + + } + } + + /** + * Return current copy of aggregated data. + */ + public Map<TimelineClusterMetric, MetricClusterAggregate> getAggregateClusterMetrics() { + return aggregateClusterMetrics; + } + + private List<String> getAppIdsForHostAggregation(Configuration metricsConf) { + String appIds = metricsConf.get(CLUSTER_AGGREGATOR_APP_IDS); + if (!StringUtils.isEmpty(appIds)) { + return Arrays.asList(StringUtils.stripAll(appIds.split(","))); + } + return Collections.emptyList(); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java new file mode 100644 index 0000000..68b2ba9 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java @@ -0,0 +1,235 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; + + +import org.apache.commons.io.FilenameUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; +import java.io.IOException; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_DISABLED; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA; + +/** + * Aggregates a metric across all hosts in the cluster. Reads metrics from + * the precision table and saves into the aggregate. + */ +public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator { + private static final Log LOG = LogFactory.getLog(TimelineMetricClusterAggregator.class); + private static final String CLUSTER_AGGREGATOR_CHECKPOINT_FILE = + "timeline-metrics-cluster-aggregator-checkpoint"; + private final String checkpointLocation; + private final Long sleepIntervalMillis; + public final int timeSliceIntervalMillis; + private final Integer checkpointCutOffMultiplier; + private TimelineMetricReadHelper timelineMetricReadHelper = new TimelineMetricReadHelper(true); + // Aggregator to perform app-level aggregates for host metrics + private final TimelineMetricAppAggregator appAggregator; + + public TimelineMetricClusterAggregator(PhoenixHBaseAccessor hBaseAccessor, + Configuration metricsConf) { + super(hBaseAccessor, metricsConf); + + String checkpointDir = metricsConf.get( + TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); + + checkpointLocation = FilenameUtils.concat(checkpointDir, + CLUSTER_AGGREGATOR_CHECKPOINT_FILE); + + sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong + (CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 120l)); + timeSliceIntervalMillis = (int)SECONDS.toMillis(metricsConf.getInt + (CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 15)); + checkpointCutOffMultiplier = + metricsConf.getInt(CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 2); + + appAggregator = new TimelineMetricAppAggregator(metricsConf); + } + + @Override + protected String getCheckpointLocation() { + return checkpointLocation; + } + + @Override + protected void aggregate(ResultSet rs, long startTime, long endTime) + throws SQLException, IOException { + List<Long[]> timeSlices = getTimeSlices(startTime, endTime); + // Initialize app aggregates for host metrics + appAggregator.init(); + Map<TimelineClusterMetric, MetricClusterAggregate> + aggregateClusterMetrics = aggregateMetricsFromResultSet(rs, timeSlices); + + LOG.info("Saving " + aggregateClusterMetrics.size() + " metric aggregates."); + hBaseAccessor.saveClusterAggregateRecords(aggregateClusterMetrics); + appAggregator.cleanup(); + } + + @Override + protected Condition prepareMetricQueryCondition(long startTime, long endTime) { + Condition condition = new DefaultCondition(null, null, null, null, startTime, + endTime, null, null, true); + condition.setNoLimit(); + condition.setFetchSize(resultsetFetchSize); + condition.setStatement(String.format(GET_METRIC_SQL, + PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA), + METRICS_RECORD_TABLE_NAME)); + condition.addOrderByColumn("METRIC_NAME"); + condition.addOrderByColumn("APP_ID"); + condition.addOrderByColumn("INSTANCE_ID"); + condition.addOrderByColumn("SERVER_TIME"); + return condition; + } + + private List<Long[]> getTimeSlices(long startTime, long endTime) { + List<Long[]> timeSlices = new ArrayList<Long[]>(); + long sliceStartTime = startTime; + while (sliceStartTime < endTime) { + timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + timeSliceIntervalMillis}); + sliceStartTime += timeSliceIntervalMillis; + } + return timeSlices; + } + + private Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMetricsFromResultSet(ResultSet rs, List<Long[]> timeSlices) + throws SQLException, IOException { + Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics = + new HashMap<TimelineClusterMetric, MetricClusterAggregate>(); + // Create time slices + + while (rs.next()) { + TimelineMetric metric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs); + + Map<TimelineClusterMetric, Double> clusterMetrics = + sliceFromTimelineMetric(metric, timeSlices); + + if (clusterMetrics != null && !clusterMetrics.isEmpty()) { + for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry : + clusterMetrics.entrySet()) { + + TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey(); + Double avgValue = clusterMetricEntry.getValue(); + + MetricClusterAggregate aggregate = aggregateClusterMetrics.get(clusterMetric); + + if (aggregate == null) { + aggregate = new MetricClusterAggregate(avgValue, 1, null, avgValue, avgValue); + aggregateClusterMetrics.put(clusterMetric, aggregate); + } else { + aggregate.updateSum(avgValue); + aggregate.updateNumberOfHosts(1); + aggregate.updateMax(avgValue); + aggregate.updateMin(avgValue); + } + // Update app level aggregates + appAggregator.processTimelineClusterMetric(clusterMetric, + metric.getHostName(), avgValue); + } + } + } + // Add app level aggregates to save + aggregateClusterMetrics.putAll(appAggregator.getAggregateClusterMetrics()); + return aggregateClusterMetrics; + } + + @Override + protected Long getSleepIntervalMillis() { + return sleepIntervalMillis; + } + + @Override + protected Integer getCheckpointCutOffMultiplier() { + return checkpointCutOffMultiplier; + } + + @Override + public boolean isDisabled() { + return metricsConf.getBoolean(CLUSTER_AGGREGATOR_MINUTE_DISABLED, false); + } + + private Map<TimelineClusterMetric, Double> sliceFromTimelineMetric( + TimelineMetric timelineMetric, List<Long[]> timeSlices) { + + if (timelineMetric.getMetricValues().isEmpty()) { + return null; + } + + Map<TimelineClusterMetric, Double> timelineClusterMetricMap = + new HashMap<TimelineClusterMetric, Double>(); + + for (Map.Entry<Long, Double> metric : timelineMetric.getMetricValues().entrySet()) { + // TODO: investigate null values - pre filter + if (metric.getValue() == null) { + continue; + } + Long timestamp = getSliceTimeForMetric(timeSlices, + Long.parseLong(metric.getKey().toString())); + if (timestamp != -1) { + // Metric is within desired time range + TimelineClusterMetric clusterMetric = new TimelineClusterMetric( + timelineMetric.getMetricName(), + timelineMetric.getAppId(), + timelineMetric.getInstanceId(), + timestamp, + timelineMetric.getType()); + if (!timelineClusterMetricMap.containsKey(clusterMetric)) { + timelineClusterMetricMap.put(clusterMetric, metric.getValue()); + } else { + Double oldValue = timelineClusterMetricMap.get(clusterMetric); + Double newValue = (oldValue + metric.getValue()) / 2; + timelineClusterMetricMap.put(clusterMetric, newValue); + } + } + } + + return timelineClusterMetricMap; + } + + /** + * Return beginning of the time slice into which the metric fits. + */ + private Long getSliceTimeForMetric(List<Long[]> timeSlices, Long timestamp) { + for (Long[] timeSlice : timeSlices) { + if (timestamp >= timeSlice[0] && timestamp < timeSlice[1]) { + return timeSlice[0]; + } + } + return -1l; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorHourly.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorHourly.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorHourly.java new file mode 100644 index 0000000..264e4e6 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorHourly.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; + +import org.apache.commons.io.FilenameUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; +import java.io.IOException; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.getMetricClusterAggregateFromResultSet; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_DISABLED; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR; + +public class TimelineMetricClusterAggregatorHourly extends AbstractTimelineAggregator { + private static final Log LOG = LogFactory.getLog + (TimelineMetricClusterAggregatorHourly.class); + private static final String CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE = + "timeline-metrics-cluster-aggregator-hourly-checkpoint"; + private final String checkpointLocation; + private final long sleepIntervalMillis; + private final Integer checkpointCutOffMultiplier; + private long checkpointCutOffIntervalMillis; + private static final Long NATIVE_TIME_RANGE_DELTA = 3600000l; // 1 hour + private final TimelineClusterMetricReader timelineClusterMetricReader + = new TimelineClusterMetricReader(true); + + public TimelineMetricClusterAggregatorHourly( + PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { + super(hBaseAccessor, metricsConf); + + String checkpointDir = metricsConf.get( + TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); + + checkpointLocation = FilenameUtils.concat(checkpointDir, + CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE); + + sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong + (CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l)); + checkpointCutOffIntervalMillis = SECONDS.toMillis(metricsConf.getLong + (CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL, 7200l)); + checkpointCutOffMultiplier = metricsConf.getInt + (CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2); + } + + @Override + protected String getCheckpointLocation() { + return checkpointLocation; + } + + @Override + protected void aggregate(ResultSet rs, long startTime, long endTime) + throws SQLException, IOException { + Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap = + aggregateMetricsFromResultSet(rs); + + LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates."); + hBaseAccessor.saveClusterAggregateHourlyRecords(hostAggregateMap, + METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME); + } + + @Override + protected Condition prepareMetricQueryCondition(long startTime, + long endTime) { + Condition condition = new DefaultCondition(null, null, null, null, startTime, + endTime, null, null, true); + condition.setNoLimit(); + condition.setFetchSize(resultsetFetchSize); + condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL, + PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA), + METRICS_CLUSTER_AGGREGATE_TABLE_NAME)); + condition.addOrderByColumn("METRIC_NAME"); + condition.addOrderByColumn("APP_ID"); + condition.addOrderByColumn("INSTANCE_ID"); + condition.addOrderByColumn("SERVER_TIME"); + return condition; + } + + private Map<TimelineClusterMetric, MetricHostAggregate> aggregateMetricsFromResultSet(ResultSet rs) + throws IOException, SQLException { + + TimelineClusterMetric existingMetric = null; + MetricHostAggregate hostAggregate = null; + Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap = + new HashMap<TimelineClusterMetric, MetricHostAggregate>(); + + while (rs.next()) { + TimelineClusterMetric currentMetric = + timelineClusterMetricReader.fromResultSet(rs); + MetricClusterAggregate currentHostAggregate = + getMetricClusterAggregateFromResultSet(rs); + + if (existingMetric == null) { + // First row + existingMetric = currentMetric; + hostAggregate = new MetricHostAggregate(); + hostAggregateMap.put(currentMetric, hostAggregate); + } + + if (existingMetric.equalsExceptTime(currentMetric)) { + // Recalculate totals with current metric + updateAggregatesFromHost(hostAggregate, currentHostAggregate); + + } else { + // Switched over to a new metric - save existing + hostAggregate = new MetricHostAggregate(); + updateAggregatesFromHost(hostAggregate, currentHostAggregate); + hostAggregateMap.put(currentMetric, hostAggregate); + existingMetric = currentMetric; + } + + } + + return hostAggregateMap; + } + + private void updateAggregatesFromHost(MetricHostAggregate agg, MetricClusterAggregate currentClusterAggregate) { + agg.updateMax(currentClusterAggregate.getMax()); + agg.updateMin(currentClusterAggregate.getMin()); + agg.updateSum(currentClusterAggregate.getSum()); + agg.updateNumberOfSamples(currentClusterAggregate.getNumberOfHosts()); + } + + @Override + protected Long getSleepIntervalMillis() { + return sleepIntervalMillis; + } + + @Override + protected Integer getCheckpointCutOffMultiplier() { + return checkpointCutOffMultiplier; + } + + @Override + protected Long getCheckpointCutOffIntervalMillis() { + return checkpointCutOffIntervalMillis; + } + + @Override + public boolean isDisabled() { + return metricsConf.getBoolean(CLUSTER_AGGREGATOR_HOUR_DISABLED, false); + } + + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java new file mode 100644 index 0000000..40a9648 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; + + +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; + +import java.io.IOException; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Map; +import java.util.TreeMap; + +public class TimelineMetricReadHelper { + + private boolean ignoreInstance = false; + + public TimelineMetricReadHelper() {} + + public TimelineMetricReadHelper(boolean ignoreInstance) { + this.ignoreInstance = ignoreInstance; + } + + public TimelineMetric getTimelineMetricFromResultSet(ResultSet rs) + throws SQLException, IOException { + TimelineMetric metric = getTimelineMetricCommonsFromResultSet(rs); + Map<Long, Double> sortedByTimeMetrics = new TreeMap<Long, Double>( + PhoenixHBaseAccessor.readMetricFromJSON(rs.getString("METRICS"))); + metric.setMetricValues(sortedByTimeMetrics); + return metric; + } + + /** + * Returns common part of timeline metrics record without the values. + */ + public TimelineMetric getTimelineMetricCommonsFromResultSet(ResultSet rs) + throws SQLException { + TimelineMetric metric = new TimelineMetric(); + metric.setMetricName(rs.getString("METRIC_NAME")); + metric.setAppId(rs.getString("APP_ID")); + if (!ignoreInstance) metric.setInstanceId(rs.getString("INSTANCE_ID")); + metric.setHostName(rs.getString("HOSTNAME")); + metric.setTimestamp(rs.getLong("SERVER_TIME")); + metric.setStartTime(rs.getLong("START_TIME")); + metric.setType(rs.getString("UNITS")); + return metric; + } + +} + http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/Condition.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/Condition.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/Condition.java new file mode 100644 index 0000000..b52748f --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/Condition.java @@ -0,0 +1,46 @@ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query; + +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.Precision; + +import java.util.List; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +public interface Condition { + boolean isEmpty(); + + List<String> getMetricNames(); + boolean isPointInTime(); + boolean isGrouped(); + void setStatement(String statement); + String getHostname(); + Precision getPrecision(); + void setPrecision(Precision precision); + String getAppId(); + String getInstanceId(); + StringBuilder getConditionClause(); + String getOrderByClause(boolean asc); + String getStatement(); + Long getStartTime(); + Long getEndTime(); + Integer getLimit(); + Integer getFetchSize(); + void setFetchSize(Integer fetchSize); + void addOrderByColumn(String column); + void setNoLimit(); +} http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConnectionProvider.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConnectionProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConnectionProvider.java new file mode 100644 index 0000000..24239a0 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConnectionProvider.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query; + + +import java.sql.Connection; +import java.sql.SQLException; + +/** + * + */ +public interface ConnectionProvider { + public Connection getConnection() throws SQLException; +} http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultCondition.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultCondition.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultCondition.java new file mode 100644 index 0000000..9d6b7df --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultCondition.java @@ -0,0 +1,258 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.Precision; + +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; + +public class DefaultCondition implements Condition { + List<String> metricNames; + String hostname; + String appId; + String instanceId; + Long startTime; + Long endTime; + Precision precision; + Integer limit; + boolean grouped; + boolean noLimit = false; + Integer fetchSize; + String statement; + Set<String> orderByColumns = new LinkedHashSet<String>(); + + public DefaultCondition(List<String> metricNames, String hostname, String appId, + String instanceId, Long startTime, Long endTime, Precision precision, + Integer limit, boolean grouped) { + this.metricNames = metricNames; + this.hostname = hostname; + this.appId = appId; + this.instanceId = instanceId; + this.startTime = startTime; + this.endTime = endTime; + this.precision = precision; + this.limit = limit; + this.grouped = grouped; + } + + public String getStatement() { + return statement; + } + + public void setStatement(String statement) { + this.statement = statement; + } + + public List<String> getMetricNames() { + return metricNames == null || metricNames.isEmpty() ? null : metricNames; + } + + public StringBuilder getConditionClause() { + StringBuilder sb = new StringBuilder(); + boolean appendConjunction = false; + StringBuilder metricsLike = new StringBuilder(); + StringBuilder metricsIn = new StringBuilder(); + + if (getMetricNames() != null) { + for (String name : getMetricNames()) { + if (name.contains("%")) { + if (metricsLike.length() > 1) { + metricsLike.append(" OR "); + } + metricsLike.append("METRIC_NAME LIKE ?"); + } else { + if (metricsIn.length() > 0) { + metricsIn.append(", "); + } + metricsIn.append("?"); + } + } + + if (metricsIn.length()>0) { + sb.append("(METRIC_NAME IN ("); + sb.append(metricsIn); + sb.append(")"); + appendConjunction = true; + } + + if (metricsLike.length() > 0) { + if (appendConjunction) { + sb.append(" OR "); + } else { + sb.append("("); + } + sb.append(metricsLike); + appendConjunction = true; + } + + if (appendConjunction) { + sb.append(")"); + } + } + + appendConjunction = append(sb, appendConjunction, getHostname(), " HOSTNAME = ?"); + appendConjunction = append(sb, appendConjunction, getAppId(), " APP_ID = ?"); + appendConjunction = append(sb, appendConjunction, getInstanceId(), " INSTANCE_ID = ?"); + appendConjunction = append(sb, appendConjunction, getStartTime(), " SERVER_TIME >= ?"); + append(sb, appendConjunction, getEndTime(), " SERVER_TIME < ?"); + + return sb; + } + + protected static boolean append(StringBuilder sb, + boolean appendConjunction, + Object value, String str) { + if (value != null) { + if (appendConjunction) { + sb.append(" AND"); + } + + sb.append(str); + appendConjunction = true; + } + return appendConjunction; + } + + public String getHostname() { + return hostname == null || hostname.isEmpty() ? null : hostname; + } + + public Precision getPrecision() { + return precision; + } + + public void setPrecision(Precision precision) { + this.precision = precision; + } + + public String getAppId() { + if (appId != null && !appId.isEmpty()) { + if (!(appId.equals("HOST") || appId.equals("FLUME_HANDLER")) ) { + return appId.toLowerCase(); + } else { + return appId; + } + } + return null; + } + + public String getInstanceId() { + return instanceId == null || instanceId.isEmpty() ? null : instanceId; + } + + /** + * Convert to millis. + */ + public Long getStartTime() { + if (startTime == null) { + return null; + } else if (startTime < 9999999999l) { + return startTime * 1000; + } else { + return startTime; + } + } + + public Long getEndTime() { + if (endTime == null) { + return null; + } + if (endTime < 9999999999l) { + return endTime * 1000; + } else { + return endTime; + } + } + + public void setNoLimit() { + this.noLimit = true; + } + + public Integer getLimit() { + if (noLimit) { + return null; + } + return limit == null ? PhoenixHBaseAccessor.RESULTSET_LIMIT : limit; + } + + public boolean isGrouped() { + return grouped; + } + + public boolean isPointInTime() { + return getStartTime() == null && getEndTime() == null; + } + + public boolean isEmpty() { + return (metricNames == null || metricNames.isEmpty()) + && (hostname == null || hostname.isEmpty()) + && (appId == null || appId.isEmpty()) + && (instanceId == null || instanceId.isEmpty()) + && startTime == null + && endTime == null; + } + + public Integer getFetchSize() { + return fetchSize; + } + + public void setFetchSize(Integer fetchSize) { + this.fetchSize = fetchSize; + } + + public void addOrderByColumn(String column) { + orderByColumns.add(column); + } + + public String getOrderByClause(boolean asc) { + String orderByStr = " ORDER BY "; + if (!orderByColumns.isEmpty()) { + StringBuilder sb = new StringBuilder(orderByStr); + for (String orderByColumn : orderByColumns) { + if (sb.length() != orderByStr.length()) { + sb.append(", "); + } + sb.append(orderByColumn); + if (!asc) { + sb.append(" DESC"); + } + } + sb.append(" "); + return sb.toString(); + } + return null; + } + + @Override + public String toString() { + return "Condition{" + + "metricNames=" + metricNames + + ", hostname='" + hostname + '\'' + + ", appId='" + appId + '\'' + + ", instanceId='" + instanceId + '\'' + + ", startTime=" + startTime + + ", endTime=" + endTime + + ", limit=" + limit + + ", grouped=" + grouped + + ", orderBy=" + orderByColumns + + ", noLimit=" + noLimit + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java new file mode 100644 index 0000000..562049b --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; + +public class DefaultPhoenixDataSource implements ConnectionProvider { + + static final Log LOG = LogFactory.getLog(DefaultPhoenixDataSource.class); + private static final String ZOOKEEPER_CLIENT_PORT = + "hbase.zookeeper.property.clientPort"; + private static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum"; + private static final String ZNODE_PARENT = "zookeeper.znode.parent"; + + private static final String connectionUrl = "jdbc:phoenix:%s:%s:%s"; + private final String url; + + public DefaultPhoenixDataSource(Configuration hbaseConf) { + String zookeeperClientPort = hbaseConf.getTrimmed(ZOOKEEPER_CLIENT_PORT, + "2181"); + String zookeeperQuorum = hbaseConf.getTrimmed(ZOOKEEPER_QUORUM); + String znodeParent = hbaseConf.getTrimmed(ZNODE_PARENT, "/hbase"); + if (zookeeperQuorum == null || zookeeperQuorum.isEmpty()) { + throw new IllegalStateException("Unable to find Zookeeper quorum to " + + "access HBase store using Phoenix."); + } + + url = String.format(connectionUrl, + zookeeperQuorum, + zookeeperClientPort, + znodeParent); + } + + /** + * Get JDBC connection to HBase store. Assumption is that the hbase + * configuration is present on the classpath and loaded by the caller into + * the Configuration object. + * Phoenix already caches the HConnection between the client and HBase + * cluster. + * + * @return @java.sql.Connection + */ + public Connection getConnection() throws SQLException { + + LOG.debug("Metric store connection url: " + url); + try { + return DriverManager.getConnection(url); + } catch (SQLException e) { + LOG.warn("Unable to connect to HBase store using Phoenix.", e); + + throw e; + } + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java new file mode 100644 index 0000000..636999f --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java @@ -0,0 +1,573 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.Precision; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +/** + * Encapsulate all metrics related SQL queries. + */ +public class PhoenixTransactSQL { + + public static final Log LOG = LogFactory.getLog(PhoenixTransactSQL.class); + + /** + * Create table to store individual metric records. + */ + public static final String CREATE_METRICS_TABLE_SQL = "CREATE TABLE IF NOT " + + "EXISTS METRIC_RECORD (METRIC_NAME VARCHAR, " + + "HOSTNAME VARCHAR, " + + "SERVER_TIME UNSIGNED_LONG NOT NULL, " + + "APP_ID VARCHAR, " + + "INSTANCE_ID VARCHAR, " + + "START_TIME UNSIGNED_LONG, " + + "UNITS CHAR(20), " + + "METRIC_SUM DOUBLE, " + + "METRIC_COUNT UNSIGNED_INT, " + + "METRIC_MAX DOUBLE, " + + "METRIC_MIN DOUBLE, " + + "METRICS VARCHAR CONSTRAINT pk " + + "PRIMARY KEY (METRIC_NAME, HOSTNAME, SERVER_TIME, APP_ID, " + + "INSTANCE_ID)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " + + "TTL=%s, COMPRESSION='%s'"; + + public static final String CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL = + "CREATE TABLE IF NOT EXISTS METRIC_RECORD_HOURLY " + + "(METRIC_NAME VARCHAR, " + + "HOSTNAME VARCHAR, " + + "APP_ID VARCHAR, " + + "INSTANCE_ID VARCHAR, " + + "SERVER_TIME UNSIGNED_LONG NOT NULL, " + + "UNITS CHAR(20), " + + "METRIC_SUM DOUBLE," + + "METRIC_COUNT UNSIGNED_INT, " + + "METRIC_MAX DOUBLE," + + "METRIC_MIN DOUBLE CONSTRAINT pk " + + "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " + + "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " + + "TTL=%s, COMPRESSION='%s'"; + + public static final String CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL = + "CREATE TABLE IF NOT EXISTS METRIC_RECORD_MINUTE " + + "(METRIC_NAME VARCHAR, " + + "HOSTNAME VARCHAR, " + + "APP_ID VARCHAR, " + + "INSTANCE_ID VARCHAR, " + + "SERVER_TIME UNSIGNED_LONG NOT NULL, " + + "UNITS CHAR(20), " + + "METRIC_SUM DOUBLE," + + "METRIC_COUNT UNSIGNED_INT, " + + "METRIC_MAX DOUBLE," + + "METRIC_MIN DOUBLE CONSTRAINT pk " + + "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " + + "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, TTL=%s," + + " COMPRESSION='%s'"; + + public static final String CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL = + "CREATE TABLE IF NOT EXISTS METRIC_AGGREGATE " + + "(METRIC_NAME VARCHAR, " + + "APP_ID VARCHAR, " + + "INSTANCE_ID VARCHAR, " + + "SERVER_TIME UNSIGNED_LONG NOT NULL, " + + "UNITS CHAR(20), " + + "METRIC_SUM DOUBLE, " + + "HOSTS_COUNT UNSIGNED_INT, " + + "METRIC_MAX DOUBLE, " + + "METRIC_MIN DOUBLE " + + "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " + + "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " + + "TTL=%s, COMPRESSION='%s'"; + + public static final String CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL = + "CREATE TABLE IF NOT EXISTS METRIC_AGGREGATE_HOURLY " + + "(METRIC_NAME VARCHAR, " + + "APP_ID VARCHAR, " + + "INSTANCE_ID VARCHAR, " + + "SERVER_TIME UNSIGNED_LONG NOT NULL, " + + "UNITS CHAR(20), " + + "METRIC_SUM DOUBLE, " + + "METRIC_COUNT UNSIGNED_INT, " + + "METRIC_MAX DOUBLE, " + + "METRIC_MIN DOUBLE " + + "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " + + "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " + + "TTL=%s, COMPRESSION='%s'"; + + /** + * ALTER table to set new options + */ + public static final String ALTER_SQL = "ALTER TABLE %s SET TTL=%s"; + + /** + * Insert into metric records table. + */ + public static final String UPSERT_METRICS_SQL = "UPSERT INTO %s " + + "(METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, START_TIME, " + + "UNITS, " + + "METRIC_SUM, " + + "METRIC_MAX, " + + "METRIC_MIN, " + + "METRIC_COUNT, " + + "METRICS) VALUES " + + "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + + public static final String UPSERT_CLUSTER_AGGREGATE_SQL = "UPSERT INTO " + + "METRIC_AGGREGATE (METRIC_NAME, APP_ID, INSTANCE_ID, SERVER_TIME, " + + "UNITS, " + + "METRIC_SUM, " + + "HOSTS_COUNT, " + + "METRIC_MAX, " + + "METRIC_MIN) " + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; + + public static final String UPSERT_CLUSTER_AGGREGATE_TIME_SQL = "UPSERT INTO" + + " %s (METRIC_NAME, APP_ID, INSTANCE_ID, SERVER_TIME, " + + "UNITS, " + + "METRIC_SUM, " + + "METRIC_COUNT, " + + "METRIC_MAX, " + + "METRIC_MIN) " + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; + + + public static final String UPSERT_AGGREGATE_RECORD_SQL = "UPSERT INTO " + + "%s (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " + + "SERVER_TIME, " + + "UNITS, " + + "METRIC_SUM, " + + "METRIC_MAX, " + + "METRIC_MIN," + + "METRIC_COUNT) " + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + + /** + * Retrieve a set of rows from metrics records table. + */ + public static final String GET_METRIC_SQL = "SELECT %s METRIC_NAME, " + + "HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, START_TIME, UNITS, " + + "METRIC_SUM, " + + "METRIC_MAX, " + + "METRIC_MIN, " + + "METRIC_COUNT, " + + "METRICS " + + "FROM %s"; + + public static final String GET_METRIC_AGGREGATE_ONLY_SQL = "SELECT %s " + + "METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, " + + "UNITS, " + + "METRIC_SUM, " + + "METRIC_MAX, " + + "METRIC_MIN, " + + "METRIC_COUNT " + + "FROM %s"; + + public static final String GET_CLUSTER_AGGREGATE_SQL = "SELECT %s " + + "METRIC_NAME, APP_ID, " + + "INSTANCE_ID, SERVER_TIME, " + + "UNITS, " + + "METRIC_SUM, " + + "HOSTS_COUNT, " + + "METRIC_MAX, " + + "METRIC_MIN " + + "FROM %s"; + + public static final String GET_CLUSTER_AGGREGATE_HOURLY_SQL = "SELECT %s " + + "METRIC_NAME, APP_ID, " + + "INSTANCE_ID, SERVER_TIME, " + + "UNITS, " + + "METRIC_SUM, " + + "METRIC_COUNT, " + + "METRIC_MAX, " + + "METRIC_MIN " + + "FROM %s"; + + public static final String METRICS_RECORD_TABLE_NAME = "METRIC_RECORD"; + public static final String METRICS_AGGREGATE_MINUTE_TABLE_NAME = + "METRIC_RECORD_MINUTE"; + public static final String METRICS_AGGREGATE_HOURLY_TABLE_NAME = + "METRIC_RECORD_HOURLY"; + public static final String METRICS_CLUSTER_AGGREGATE_TABLE_NAME = + "METRIC_AGGREGATE"; + public static final String METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME = + "METRIC_AGGREGATE_HOURLY"; + public static final String DEFAULT_TABLE_COMPRESSION = "SNAPPY"; + public static final String DEFAULT_ENCODING = "FAST_DIFF"; + public static final long NATIVE_TIME_RANGE_DELTA = 120000; // 2 minutes + public static final long HOUR = 3600000; // 1 hour + public static final long DAY = 86400000; // 1 day + + /** + * Filter to optimize HBase scan by using file timestamps. This prevents + * a full table scan of metric records. + * + * @return Phoenix Hint String + */ + public static String getNaiveTimeRangeHint(Long startTime, Long delta) { + return String.format("/*+ NATIVE_TIME_RANGE(%s) */", (startTime - delta)); + } + + public static PreparedStatement prepareGetMetricsSqlStmt( + Connection connection, Condition condition) throws SQLException { + + validateConditionIsNotEmpty(condition); + validateRowCountLimit(condition); + + String stmtStr; + if (condition.getStatement() != null) { + stmtStr = condition.getStatement(); + } else { + + String metricsTable; + String query; + if (condition.getPrecision() == null) { + long endTime = condition.getEndTime() == null ? System.currentTimeMillis() : condition.getEndTime(); + long startTime = condition.getStartTime() == null ? 0 : condition.getStartTime(); + Long timeRange = endTime - startTime; + if (timeRange > 5 * DAY) { + metricsTable = METRICS_AGGREGATE_HOURLY_TABLE_NAME; + query = GET_METRIC_AGGREGATE_ONLY_SQL; + condition.setPrecision(Precision.HOURS); + } else if (timeRange > 10 * HOUR) { + metricsTable = METRICS_AGGREGATE_MINUTE_TABLE_NAME; + query = GET_METRIC_AGGREGATE_ONLY_SQL; + condition.setPrecision(Precision.MINUTES); + } else { + metricsTable = METRICS_RECORD_TABLE_NAME; + query = GET_METRIC_SQL; + condition.setPrecision(Precision.SECONDS); + } + } else { + switch (condition.getPrecision()) { + case HOURS: + metricsTable = METRICS_AGGREGATE_HOURLY_TABLE_NAME; + query = GET_METRIC_AGGREGATE_ONLY_SQL; + break; + case MINUTES: + metricsTable = METRICS_AGGREGATE_MINUTE_TABLE_NAME; + query = GET_METRIC_AGGREGATE_ONLY_SQL; + break; + default: + metricsTable = METRICS_RECORD_TABLE_NAME; + query = GET_METRIC_SQL; + } + } + + stmtStr = String.format(query, + getNaiveTimeRangeHint(condition.getStartTime(), NATIVE_TIME_RANGE_DELTA), + metricsTable); + } + + StringBuilder sb = new StringBuilder(stmtStr); + sb.append(" WHERE "); + sb.append(condition.getConditionClause()); + String orderByClause = condition.getOrderByClause(true); + + if (orderByClause != null) { + sb.append(orderByClause); + } else { + sb.append(" ORDER BY METRIC_NAME, SERVER_TIME "); + } + if (condition.getLimit() != null) { + sb.append(" LIMIT ").append(condition.getLimit()); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("SQL: " + sb.toString() + ", condition: " + condition); + } + PreparedStatement stmt = connection.prepareStatement(sb.toString()); + int pos = 1; + if (condition.getMetricNames() != null) { + for (; pos <= condition.getMetricNames().size(); pos++) { + if (LOG.isDebugEnabled()) { + LOG.debug("Setting pos: " + pos + ", value = " + condition.getMetricNames().get(pos - 1)); + } + stmt.setString(pos, condition.getMetricNames().get(pos - 1)); + } + } + if (condition.getHostname() != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Setting pos: " + pos + ", value: " + condition.getHostname()); + } + stmt.setString(pos++, condition.getHostname()); + } + if (condition.getAppId() != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Setting pos: " + pos + ", value: " + condition.getAppId()); + } + stmt.setString(pos++, condition.getAppId()); + } + if (condition.getInstanceId() != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Setting pos: " + pos + ", value: " + condition.getInstanceId()); + } + stmt.setString(pos++, condition.getInstanceId()); + } + if (condition.getStartTime() != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Setting pos: " + pos + ", value: " + condition.getStartTime()); + } + stmt.setLong(pos++, condition.getStartTime()); + } + if (condition.getEndTime() != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Setting pos: " + pos + ", value: " + condition.getEndTime()); + } + stmt.setLong(pos, condition.getEndTime()); + } + if (condition.getFetchSize() != null) { + stmt.setFetchSize(condition.getFetchSize()); + } + + return stmt; + } + + private static void validateConditionIsNotEmpty(Condition condition) { + if (condition.isEmpty()) { + throw new IllegalArgumentException("Condition is empty."); + } + } + + private static void validateRowCountLimit(Condition condition) { + if (condition.getMetricNames() == null + || condition.getMetricNames().size() ==0 ) { + //aggregator can use empty metrics query + return; + } + + long range = condition.getEndTime() - condition.getStartTime(); + long rowsPerMetric = TimeUnit.MILLISECONDS.toHours(range) + 1; + + Precision precision = condition.getPrecision(); + // for minutes and seconds we can use the rowsPerMetric computed based on + // minutes + if (precision != null && precision == Precision.HOURS) { + rowsPerMetric = TimeUnit.MILLISECONDS.toHours(range) + 1; + } + + long totalRowsRequested = rowsPerMetric * condition.getMetricNames().size(); + if (totalRowsRequested > PhoenixHBaseAccessor.RESULTSET_LIMIT) { + throw new IllegalArgumentException("The time range query for " + + "precision table exceeds row count limit, please query aggregate " + + "table instead."); + } + } + + public static PreparedStatement prepareGetLatestMetricSqlStmt( + Connection connection, Condition condition) throws SQLException { + + validateConditionIsNotEmpty(condition); + + if (condition.getMetricNames() == null + || condition.getMetricNames().size() == 0) { + throw new IllegalArgumentException("Point in time query without " + + "metric names not supported "); + } + + String stmtStr; + if (condition.getStatement() != null) { + stmtStr = condition.getStatement(); + } else { + stmtStr = String.format(GET_METRIC_SQL, + "", + METRICS_RECORD_TABLE_NAME); + } + + StringBuilder sb = new StringBuilder(stmtStr); + sb.append(" WHERE "); + sb.append(condition.getConditionClause()); + String orderByClause = condition.getOrderByClause(false); + if (orderByClause != null) { + sb.append(orderByClause); + } else { + sb.append(" ORDER BY METRIC_NAME DESC, HOSTNAME DESC, SERVER_TIME DESC "); + } + + sb.append(" LIMIT ").append(condition.getMetricNames().size()); + + if (LOG.isDebugEnabled()) { + LOG.debug("SQL: " + sb.toString() + ", condition: " + condition); + } + PreparedStatement stmt = connection.prepareStatement(sb.toString()); + int pos = 1; + if (condition.getMetricNames() != null) { + //IGNORE condition limit, set one based on number of metric names + for (; pos <= condition.getMetricNames().size(); pos++) { + if (LOG.isDebugEnabled()) { + LOG.debug("Setting pos: " + pos + ", value = " + condition.getMetricNames().get(pos - 1)); + } + stmt.setString(pos, condition.getMetricNames().get(pos - 1)); + } + } + if (condition.getHostname() != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Setting pos: " + pos + ", value: " + condition.getHostname()); + } + stmt.setString(pos++, condition.getHostname()); + } + if (condition.getAppId() != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Setting pos: " + pos + ", value: " + condition.getAppId()); + } + stmt.setString(pos++, condition.getAppId()); + } + if (condition.getInstanceId() != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Setting pos: " + pos + ", value: " + condition.getInstanceId()); + } + stmt.setString(pos++, condition.getInstanceId()); + } + + if (condition.getFetchSize() != null) { + stmt.setFetchSize(condition.getFetchSize()); + } + + return stmt; + } + + public static PreparedStatement prepareGetAggregateSqlStmt( + Connection connection, Condition condition) throws SQLException { + + validateConditionIsNotEmpty(condition); + + String metricsAggregateTable; + String queryStmt; + if (condition.getPrecision() == null) { + long endTime = condition.getEndTime() == null ? System.currentTimeMillis() : condition.getEndTime(); + long startTime = condition.getStartTime() == null ? 0 : condition.getStartTime(); + Long timeRange = endTime - startTime; + if (timeRange > 5 * DAY) { + metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME; + queryStmt = GET_CLUSTER_AGGREGATE_HOURLY_SQL; + condition.setPrecision(Precision.HOURS); + } else { + metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_TABLE_NAME; + queryStmt = GET_CLUSTER_AGGREGATE_SQL; + condition.setPrecision(Precision.SECONDS); + } + } else { + switch (condition.getPrecision()) { + case HOURS: + metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME; + queryStmt = GET_CLUSTER_AGGREGATE_HOURLY_SQL; + break; + default: + metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_TABLE_NAME; + queryStmt = GET_CLUSTER_AGGREGATE_SQL; + } + } + + StringBuilder sb = new StringBuilder(queryStmt); + sb.append(" WHERE "); + sb.append(condition.getConditionClause()); + sb.append(" ORDER BY METRIC_NAME, SERVER_TIME"); + if (condition.getLimit() != null) { + sb.append(" LIMIT ").append(condition.getLimit()); + } + + String query = String.format(sb.toString(), + PhoenixTransactSQL.getNaiveTimeRangeHint(condition.getStartTime(), + NATIVE_TIME_RANGE_DELTA), metricsAggregateTable); + if (LOG.isDebugEnabled()) { + LOG.debug("SQL => " + query + ", condition => " + condition); + } + PreparedStatement stmt = connection.prepareStatement(query); + int pos = 1; + if (condition.getMetricNames() != null) { + for (; pos <= condition.getMetricNames().size(); pos++) { + stmt.setString(pos, condition.getMetricNames().get(pos - 1)); + } + } + // TODO: Upper case all strings on POST + if (condition.getAppId() != null) { + stmt.setString(pos++, condition.getAppId()); + } + if (condition.getInstanceId() != null) { + stmt.setString(pos++, condition.getInstanceId()); + } + if (condition.getStartTime() != null) { + stmt.setLong(pos++, condition.getStartTime()); + } + if (condition.getEndTime() != null) { + stmt.setLong(pos, condition.getEndTime()); + } + + return stmt; + } + + public static PreparedStatement prepareGetLatestAggregateMetricSqlStmt( + Connection connection, Condition condition) throws SQLException { + + validateConditionIsNotEmpty(condition); + + String stmtStr; + if (condition.getStatement() != null) { + stmtStr = condition.getStatement(); + } else { + stmtStr = String.format(GET_CLUSTER_AGGREGATE_SQL, "", + METRICS_CLUSTER_AGGREGATE_TABLE_NAME); + } + + StringBuilder sb = new StringBuilder(stmtStr); + sb.append(" WHERE "); + sb.append(condition.getConditionClause()); + String orderByClause = condition.getOrderByClause(false); + if (orderByClause != null) { + sb.append(orderByClause); + } else { + sb.append(" ORDER BY METRIC_NAME DESC, SERVER_TIME DESC "); + } + + sb.append(" LIMIT ").append(condition.getMetricNames().size()); + + String query = sb.toString(); + if (LOG.isDebugEnabled()) { + LOG.debug("SQL: " + query + ", condition: " + condition); + } + + PreparedStatement stmt = connection.prepareStatement(query); + int pos = 1; + if (condition.getMetricNames() != null) { + for (; pos <= condition.getMetricNames().size(); pos++) { + stmt.setString(pos, condition.getMetricNames().get(pos - 1)); + } + } + if (condition.getAppId() != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Setting pos: " + pos + ", value: " + condition.getAppId()); + } + stmt.setString(pos++, condition.getAppId()); + } + if (condition.getInstanceId() != null) { + stmt.setString(pos++, condition.getInstanceId()); + } + + return stmt; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/SplitByMetricNamesCondition.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/SplitByMetricNamesCondition.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/SplitByMetricNamesCondition.java new file mode 100644 index 0000000..00d6a82 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/SplitByMetricNamesCondition.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query; + +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.Precision; + +import java.util.Collections; +import java.util.List; + +public class SplitByMetricNamesCondition implements Condition { + private final Condition adaptee; + private String currentMetric; + + public SplitByMetricNamesCondition(Condition condition){ + this.adaptee = condition; + } + + @Override + public boolean isEmpty() { + return adaptee.isEmpty(); + } + + @Override + public List<String> getMetricNames() { + return Collections.singletonList(currentMetric); + } + + @Override + public boolean isPointInTime() { + return adaptee.isPointInTime(); + } + + @Override + public boolean isGrouped() { + return adaptee.isGrouped(); + } + + @Override + public void setStatement(String statement) { + adaptee.setStatement(statement); + } + + @Override + public String getHostname() { + return adaptee.getHostname(); + } + + @Override + public Precision getPrecision() { + return adaptee.getPrecision(); + } + + @Override + public void setPrecision(Precision precision) { + adaptee.setPrecision(precision); + } + + @Override + public String getAppId() { + return adaptee.getAppId(); + } + + @Override + public String getInstanceId() { + return adaptee.getInstanceId(); + } + + @Override + public StringBuilder getConditionClause() { + StringBuilder sb = new StringBuilder(); + boolean appendConjunction = false; + + if (getMetricNames() != null) { + for (String name : getMetricNames()) { + if (sb.length() > 1) { + sb.append(" OR "); + } + sb.append("METRIC_NAME = ?"); + } + + appendConjunction = true; + } + + appendConjunction = DefaultCondition.append(sb, appendConjunction, + getHostname(), " HOSTNAME = ?"); + appendConjunction = DefaultCondition.append(sb, appendConjunction, + getAppId(), " APP_ID = ?"); + appendConjunction = DefaultCondition.append(sb, appendConjunction, + getInstanceId(), " INSTANCE_ID = ?"); + appendConjunction = DefaultCondition.append(sb, appendConjunction, + getStartTime(), " SERVER_TIME >= ?"); + DefaultCondition.append(sb, appendConjunction, getEndTime(), + " SERVER_TIME < ?"); + + return sb; + } + + @Override + public String getOrderByClause(boolean asc) { + return adaptee.getOrderByClause(asc); + } + + @Override + public String getStatement() { + return adaptee.getStatement(); + } + + @Override + public Long getStartTime() { + return adaptee.getStartTime(); + } + + @Override + public Long getEndTime() { + return adaptee.getEndTime(); + } + + @Override + public Integer getLimit() { + return adaptee.getLimit(); + } + + @Override + public Integer getFetchSize() { + return adaptee.getFetchSize(); + } + + @Override + public void setFetchSize(Integer fetchSize) { + adaptee.setFetchSize(fetchSize); + } + + @Override + public void addOrderByColumn(String column) { + adaptee.addOrderByColumn(column); + } + + @Override + public void setNoLimit() { + adaptee.setNoLimit(); + } + + public List<String> getOriginalMetricNames() { + return adaptee.getMetricNames(); + } + + public void setCurrentMetric(String currentMetric) { + this.currentMetric = currentMetric; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java index 3720852..e1d256d 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java @@ -24,8 +24,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics - .timeline.DefaultPhoenixDataSource; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultPhoenixDataSource; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics .timeline.PhoenixHBaseAccessor; import org.apache.zookeeper.ClientCnxn; http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java index b11a977..90c03e4 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.ConnectionProvider; import org.apache.phoenix.hbase.index.write.IndexWriterUtils; import org.apache.phoenix.query.BaseTest; import org.apache.phoenix.query.QueryServices; @@ -37,7 +38,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.LOG; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.LOG; import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; import static org.assertj.core.api.Assertions.assertThat; http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregatorTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregatorTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregatorTest.java index 969192d..c22e734 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregatorTest.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregatorTest.java @@ -18,6 +18,9 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AbstractTimelineAggregator; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; import org.apache.hadoop.yarn.util.Clock; import org.junit.Before; import org.junit.Test; @@ -59,7 +62,7 @@ public class AbstractTimelineAggregatorTest { agg = new AbstractTimelineAggregator( null, metricsConf, clock) { @Override - protected boolean doWork(long startTime, long endTime) { + public boolean doWork(long startTime, long endTime) { startTimeInDoWork.set(startTime); endTimeInDoWork.set(endTime); actualRuns++; @@ -68,7 +71,7 @@ public class AbstractTimelineAggregatorTest { } @Override - protected PhoenixTransactSQL.Condition + protected Condition prepareMetricQueryCondition(long startTime, long endTime) { return null; } @@ -89,7 +92,7 @@ public class AbstractTimelineAggregatorTest { } @Override - protected boolean isDisabled() { + public boolean isDisabled() { return false; } http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/FunctionTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/FunctionTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/FunctionTest.java index 2a389ac..af9c6bb 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/FunctionTest.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/FunctionTest.java @@ -17,11 +17,12 @@ */ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function; import org.junit.Test; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.Function.fromMetricName; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.Function.ReadFunction.AVG; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.Function.PostProcessingFunction.RATE; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.fromMetricName; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.ReadFunction.AVG; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.PostProcessingFunction.RATE; import static org.assertj.core.api.Assertions.assertThat; public class FunctionTest {