http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java new file mode 100644 index 0000000..652c492 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .timeline; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; + +public class DefaultPhoenixDataSource implements ConnectionProvider { + + static final Log LOG = LogFactory.getLog(DefaultPhoenixDataSource.class); + private static final String ZOOKEEPER_CLIENT_PORT = + "hbase.zookeeper.property.clientPort"; + private static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum"; + private static final String ZNODE_PARENT = "zookeeper.znode.parent"; + + private static final String connectionUrl = "jdbc:phoenix:%s:%s:%s"; + private final String url; + + public DefaultPhoenixDataSource(Configuration hbaseConf) { + String zookeeperClientPort = hbaseConf.getTrimmed(ZOOKEEPER_CLIENT_PORT, + "2181"); + String zookeeperQuorum = hbaseConf.getTrimmed(ZOOKEEPER_QUORUM); + String znodeParent = hbaseConf.getTrimmed(ZNODE_PARENT, "/hbase"); + if (zookeeperQuorum == null || zookeeperQuorum.isEmpty()) { + throw new IllegalStateException("Unable to find Zookeeper quorum to " + + "access HBase store using Phoenix."); + } + + url = String.format(connectionUrl, + zookeeperQuorum, + zookeeperClientPort, + znodeParent); + } + + /** + * Get JDBC connection to HBase store. Assumption is that the hbase + * configuration is present on the classpath and loaded by the caller into + * the Configuration object. + * Phoenix already caches the HConnection between the client and HBase + * cluster. + * + * @return @java.sql.Connection + */ + public Connection getConnection() throws SQLException { + + LOG.debug("Metric store connection url: " + url); + try { + return DriverManager.getConnection(url); + } catch (SQLException e) { + LOG.warn("Unable to connect to HBase store using Phoenix.", e); + + throw e; + } + } + +}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java new file mode 100644 index 0000000..9364187 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; + +import java.io.IOException; +import java.net.URL; +import java.sql.SQLException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .timeline.PhoenixTransactSQL.Condition; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .timeline.TimelineMetricConfiguration.HBASE_SITE_CONFIGURATION_FILE; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .timeline.TimelineMetricConfiguration.METRICS_SITE_CONFIGURATION_FILE; + +public class HBaseTimelineMetricStore extends AbstractService + implements TimelineMetricStore { + + static final Log LOG = LogFactory.getLog(HBaseTimelineMetricStore.class); + private PhoenixHBaseAccessor hBaseAccessor; + + /** + * Construct the service. + * + */ + public HBaseTimelineMetricStore() { + super(HBaseTimelineMetricStore.class.getName()); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + if (classLoader == null) { + classLoader = getClass().getClassLoader(); + } + URL hbaseResUrl = classLoader.getResource(HBASE_SITE_CONFIGURATION_FILE); + URL amsResUrl = classLoader.getResource(METRICS_SITE_CONFIGURATION_FILE); + LOG.info("Found hbase site configuration: " + hbaseResUrl); + LOG.info("Found metric service configuration: " + amsResUrl); + + if (hbaseResUrl == null) { + throw new IllegalStateException("Unable to initialize the metrics " + + "subsystem. No hbase-site present in the classpath."); + } + + if (amsResUrl == null) { + throw new IllegalStateException("Unable to initialize the metrics " + + "subsystem. No ams-site present in the classpath."); + } + + Configuration hbaseConf = new Configuration(true); + hbaseConf.addResource(hbaseResUrl.toURI().toURL()); + Configuration metricsConf = new Configuration(true); + metricsConf.addResource(amsResUrl.toURI().toURL()); + + initializeSubsystem(hbaseConf, metricsConf); + } + + private void initializeSubsystem(Configuration hbaseConf, + Configuration metricsConf) { + hBaseAccessor = new PhoenixHBaseAccessor(hbaseConf, metricsConf); + hBaseAccessor.initMetricSchema(); + + // Start the cluster aggregator + TimelineMetricClusterAggregator minuteClusterAggregator = + new TimelineMetricClusterAggregator(hBaseAccessor, metricsConf); + if (!minuteClusterAggregator.isDisabled()) { + Thread aggregatorThread = new Thread(minuteClusterAggregator); + aggregatorThread.start(); + } + + // Start the cluster aggregator hourly + TimelineMetricClusterAggregatorHourly hourlyClusterAggregator = + new TimelineMetricClusterAggregatorHourly(hBaseAccessor, metricsConf); + if (!hourlyClusterAggregator.isDisabled()) { + Thread aggregatorThread = new Thread(hourlyClusterAggregator); + aggregatorThread.start(); + } + + // Start the 5 minute aggregator + TimelineMetricAggregator minuteHostAggregator = + TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute + (hBaseAccessor, metricsConf); + if (!minuteHostAggregator.isDisabled()) { + Thread minuteAggregatorThread = new Thread(minuteHostAggregator); + minuteAggregatorThread.start(); + } + + // Start hourly host aggregator + TimelineMetricAggregator hourlyHostAggregator = + TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly + (hBaseAccessor, metricsConf); + if (!hourlyHostAggregator.isDisabled()) { + Thread aggregatorHourlyThread = new Thread(hourlyHostAggregator); + aggregatorHourlyThread.start(); + } + } + + @Override + protected void serviceStop() throws Exception { + super.serviceStop(); + } + + //TODO: update to work with HOSTS_COUNT and METRIC_COUNT + @Override + public TimelineMetrics getTimelineMetrics(List<String> metricNames, + String hostname, String applicationId, String instanceId, + Long startTime, Long endTime, Integer limit, + boolean groupedByHosts) throws SQLException, IOException { + + Condition condition = new Condition(metricNames, hostname, applicationId, + instanceId, startTime, endTime, limit, groupedByHosts); + + if (hostname == null) { + return hBaseAccessor.getAggregateMetricRecords(condition); + } + + return hBaseAccessor.getMetricRecords(condition); + } + + @Override + public TimelineMetric getTimelineMetric(String metricName, String hostname, + String applicationId, String instanceId, Long startTime, + Long endTime, Integer limit) + throws SQLException, IOException { + + TimelineMetrics metrics = hBaseAccessor.getMetricRecords( + new Condition(Collections.singletonList(metricName), hostname, + applicationId, instanceId, startTime, endTime, limit, true) + ); + + TimelineMetric metric = new TimelineMetric(); + List<TimelineMetric> metricList = metrics.getMetrics(); + + if (metricList != null && !metricList.isEmpty()) { + metric.setMetricName(metricList.get(0).getMetricName()); + metric.setAppId(metricList.get(0).getAppId()); + metric.setInstanceId(metricList.get(0).getInstanceId()); + metric.setHostName(metricList.get(0).getHostName()); + // Assumption that metrics are ordered by start time + metric.setStartTime(metricList.get(0).getStartTime()); + Map<Long, Double> metricRecords = new HashMap<Long, Double>(); + for (TimelineMetric timelineMetric : metricList) { + metricRecords.putAll(timelineMetric.getMetricValues()); + } + metric.setMetricValues(metricRecords); + } + + return metric; + } + + + @Override + public TimelinePutResponse putMetrics(TimelineMetrics metrics) + throws SQLException, IOException { + + // Error indicated by the Sql exception + TimelinePutResponse response = new TimelinePutResponse(); + + hBaseAccessor.insertMetricRecords(metrics); + + return response; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java new file mode 100644 index 0000000..61e15d7 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; + + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.codehaus.jackson.annotate.JsonProperty; +import org.codehaus.jackson.annotate.JsonSubTypes; +import org.codehaus.jackson.map.ObjectMapper; + +import java.io.IOException; + +/** +* +*/ +@JsonSubTypes({@JsonSubTypes.Type(value = MetricClusterAggregate.class), + @JsonSubTypes.Type(value = MetricHostAggregate.class)}) +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class MetricAggregate { + private static final ObjectMapper mapper = new ObjectMapper(); + + protected Double sum = 0.0; + protected Double deviation; + protected Double max = Double.MIN_VALUE; + protected Double min = Double.MAX_VALUE; + + public MetricAggregate() { + } + + MetricAggregate(Double sum, Double deviation, Double max, + Double min) { + this.sum = sum; + this.deviation = deviation; + this.max = max; + this.min = min; + } + + void updateSum(Double sum) { + this.sum += sum; + } + + void updateMax(Double max) { + if (max > this.max) { + this.max = max; + } + } + + void updateMin(Double min) { + if (min < this.min) { + this.min = min; + } + } + + @JsonProperty("sum") + Double getSum() { + return sum; + } + + @JsonProperty("deviation") + Double getDeviation() { + return deviation; + } + + @JsonProperty("max") + Double getMax() { + return max; + } + + @JsonProperty("min") + Double getMin() { + return min; + } + + public void setSum(Double sum) { + this.sum = sum; + } + + public void setDeviation(Double deviation) { + this.deviation = deviation; + } + + public void setMax(Double max) { + this.max = max; + } + + public void setMin(Double min) { + this.min = min; + } + + public String toJSON() throws IOException { + return mapper.writeValueAsString(this); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java new file mode 100644 index 0000000..c13c85f --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; + + +import org.codehaus.jackson.annotate.JsonCreator; +import org.codehaus.jackson.annotate.JsonProperty; + +/** +* +*/ +public class MetricClusterAggregate extends MetricAggregate { + private int numberOfHosts; + + @JsonCreator + public MetricClusterAggregate() { + } + + MetricClusterAggregate(Double sum, int numberOfHosts, Double deviation, + Double max, Double min) { + super(sum, deviation, max, min); + this.numberOfHosts = numberOfHosts; + } + + @JsonProperty("numberOfHosts") + int getNumberOfHosts() { + return numberOfHosts; + } + + void updateNumberOfHosts(int count) { + this.numberOfHosts += count; + } + + public void setNumberOfHosts(int numberOfHosts) { + this.numberOfHosts = numberOfHosts; + } + + /** + * Find and update min, max and avg for a minute + */ + void updateAggregates(MetricClusterAggregate hostAggregate) { + updateMax(hostAggregate.getMax()); + updateMin(hostAggregate.getMin()); + updateSum(hostAggregate.getSum()); + updateNumberOfHosts(hostAggregate.getNumberOfHosts()); + } + + @Override + public String toString() { +// MetricClusterAggregate + return "MetricAggregate{" + + "sum=" + sum + + ", numberOfHosts=" + numberOfHosts + + ", deviation=" + deviation + + ", max=" + max + + ", min=" + min + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java new file mode 100644 index 0000000..02cc207 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; + + +import org.codehaus.jackson.annotate.JsonCreator; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * Represents a collection of minute based aggregation of values for + * resolution greater than a minute. + */ +public class MetricHostAggregate extends MetricAggregate { + + private long numberOfSamples = 0; + + @JsonCreator + public MetricHostAggregate() { + super(0.0, 0.0, Double.MIN_VALUE, Double.MAX_VALUE); + } + + public MetricHostAggregate(Double sum, int numberOfSamples, + Double deviation, + Double max, Double min) { + super(sum, deviation, max, min); + this.numberOfSamples = numberOfSamples; + } + + @JsonProperty("numberOfSamples") + long getNumberOfSamples() { + return numberOfSamples == 0 ? 1 : numberOfSamples; + } + + void updateNumberOfSamples(long count) { + this.numberOfSamples += count; + } + + public void setNumberOfSamples(long numberOfSamples) { + this.numberOfSamples = numberOfSamples; + } + + public double getAvg() { + return sum / numberOfSamples; + } + + /** + * Find and update min, max and avg for a minute + */ + void updateAggregates(MetricHostAggregate hostAggregate) { + updateMax(hostAggregate.getMax()); + updateMin(hostAggregate.getMin()); + updateSum(hostAggregate.getSum()); + updateNumberOfSamples(hostAggregate.getNumberOfSamples()); + } + + @Override + public String toString() { + return "MetricHostAggregate{" + + "sum=" + sum + + ", numberOfSamples=" + numberOfSamples + + ", deviation=" + deviation + + ", max=" + max + + ", min=" + min + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricsInitializationException.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricsInitializationException.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricsInitializationException.java new file mode 100644 index 0000000..88a427a --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricsInitializationException.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .timeline; + +/** + * RuntimeException for initialization of metrics schema. It is RuntimeException + * since this is a not recoverable situation, and should be handled by main or + * service method followed by shutdown. + */ +public class MetricsInitializationException extends RuntimeException { + public MetricsInitializationException() { + } + + public MetricsInitializationException(String msg) { + super(msg); + } + + public MetricsInitializationException(Throwable t) { + super(t); + } + + public MetricsInitializationException(String msg, Throwable t) { + super(msg, t); + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java new file mode 100644 index 0000000..4f248b7 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java @@ -0,0 +1,678 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.util.RetryCounter; +import org.apache.hadoop.hbase.util.RetryCounterFactory; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.type.TypeReference; +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DEFAULT_ENCODING; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_AGGREGATE_RECORD_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_TIME_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_METRICS_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_HOUR_TABLE_TTL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_MINUTE_TABLE_TTL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_MAX_RETRIES; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_RESULT_LIMIT; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_RETRY_INTERVAL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_ENCODING_SCHEME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_HOUR_TABLE_TTL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_MINUTE_TABLE_TTL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_TTL; + +/** + * Provides a facade over the Phoenix API to access HBase schema + */ +public class PhoenixHBaseAccessor { + + private static final Log LOG = LogFactory.getLog(PhoenixHBaseAccessor.class); + private final Configuration hbaseConf; + private final Configuration metricsConf; + private final RetryCounterFactory retryCounterFactory; + + static final int PHOENIX_MAX_MUTATION_STATE_SIZE = 50000; + /** + * 4 metrics/min * 60 * 24: Retrieve data for 1 day. + */ + private static final int METRICS_PER_MINUTE = 4; + public static int RESULTSET_LIMIT = (int)TimeUnit.DAYS.toMinutes(1) * + METRICS_PER_MINUTE; + private static ObjectMapper mapper = new ObjectMapper(); + + private static TypeReference<Map<Long, Double>> metricValuesTypeRef = + new TypeReference<Map<Long, Double>>() {}; + private final ConnectionProvider dataSource; + + public PhoenixHBaseAccessor(Configuration hbaseConf, + Configuration metricsConf){ + this(hbaseConf, metricsConf, new DefaultPhoenixDataSource(hbaseConf)); + } + + public PhoenixHBaseAccessor(Configuration hbaseConf, + Configuration metricsConf, + ConnectionProvider dataSource) { + this.hbaseConf = hbaseConf; + this.metricsConf = metricsConf; + RESULTSET_LIMIT = metricsConf.getInt(GLOBAL_RESULT_LIMIT, 5760); + try { + Class.forName("org.apache.phoenix.jdbc.PhoenixDriver"); + } catch (ClassNotFoundException e) { + LOG.error("Phoenix client jar not found in the classpath.", e); + throw new IllegalStateException(e); + } + this.dataSource = dataSource; + this.retryCounterFactory = new RetryCounterFactory( + metricsConf.getInt(GLOBAL_MAX_RETRIES, 10), + (int) SECONDS.toMillis(metricsConf.getInt(GLOBAL_RETRY_INTERVAL, 5))); + } + + + private Connection getConnectionRetryingOnException() + throws SQLException, InterruptedException { + RetryCounter retryCounter = retryCounterFactory.create(); + while (true) { + try{ + return getConnection(); + } catch (SQLException e) { + if(!retryCounter.shouldRetry()){ + LOG.error("HBaseAccessor getConnection failed after " + + retryCounter.getMaxAttempts() + " attempts"); + throw e; + } + } + retryCounter.sleepUntilNextRetry(); + } + } + + + /** + * Get JDBC connection to HBase store. Assumption is that the hbase + * configuration is present on the classpath and loaded by the caller into + * the Configuration object. + * Phoenix already caches the HConnection between the client and HBase + * cluster. + * + * @return @java.sql.Connection + */ + public Connection getConnection() throws SQLException { + return dataSource.getConnection(); + } + + public static Map readMetricFromJSON(String json) throws IOException { + return mapper.readValue(json, metricValuesTypeRef); + } + + @SuppressWarnings("unchecked") + static TimelineMetric getTimelineMetricFromResultSet(ResultSet rs) + throws SQLException, IOException { + TimelineMetric metric = new TimelineMetric(); + metric.setMetricName(rs.getString("METRIC_NAME")); + metric.setAppId(rs.getString("APP_ID")); + metric.setInstanceId(rs.getString("INSTANCE_ID")); + metric.setHostName(rs.getString("HOSTNAME")); + metric.setTimestamp(rs.getLong("SERVER_TIME")); + metric.setStartTime(rs.getLong("START_TIME")); + metric.setType(rs.getString("UNITS")); + metric.setMetricValues( + (Map<Long, Double>) readMetricFromJSON(rs.getString("METRICS"))); + return metric; + } + + static TimelineMetric getTimelineMetricKeyFromResultSet(ResultSet rs) + throws SQLException, IOException { + TimelineMetric metric = new TimelineMetric(); + metric.setMetricName(rs.getString("METRIC_NAME")); + metric.setAppId(rs.getString("APP_ID")); + metric.setInstanceId(rs.getString("INSTANCE_ID")); + metric.setHostName(rs.getString("HOSTNAME")); + metric.setTimestamp(rs.getLong("SERVER_TIME")); + metric.setType(rs.getString("UNITS")); + return metric; + } + + static MetricHostAggregate getMetricHostAggregateFromResultSet(ResultSet rs) + throws SQLException { + MetricHostAggregate metricHostAggregate = new MetricHostAggregate(); + metricHostAggregate.setSum(rs.getDouble("METRIC_SUM")); + metricHostAggregate.setMax(rs.getDouble("METRIC_MAX")); + metricHostAggregate.setMin(rs.getDouble("METRIC_MIN")); + metricHostAggregate.setNumberOfSamples(rs.getLong("METRIC_COUNT")); + + metricHostAggregate.setDeviation(0.0); + return metricHostAggregate; + } + + static TimelineClusterMetric + getTimelineMetricClusterKeyFromResultSet(ResultSet rs) + throws SQLException, IOException { + TimelineClusterMetric metric = new TimelineClusterMetric( + rs.getString("METRIC_NAME"), + rs.getString("APP_ID"), + rs.getString("INSTANCE_ID"), + rs.getLong("SERVER_TIME"), + rs.getString("UNITS")); + + return metric; + } + + static MetricClusterAggregate + getMetricClusterAggregateFromResultSet(ResultSet rs) + throws SQLException { + MetricClusterAggregate agg = new MetricClusterAggregate(); + agg.setSum(rs.getDouble("METRIC_SUM")); + agg.setMax(rs.getDouble("METRIC_MAX")); + agg.setMin(rs.getDouble("METRIC_MIN")); + agg.setNumberOfHosts(rs.getInt("HOSTS_COUNT")); + + agg.setDeviation(0.0); + + return agg; + } + + protected void initMetricSchema() { + Connection conn = null; + Statement stmt = null; + + String encoding = metricsConf.get(HBASE_ENCODING_SCHEME, DEFAULT_ENCODING); + String compression = metricsConf.get(HBASE_COMPRESSION_SCHEME, DEFAULT_TABLE_COMPRESSION); + String precisionTtl = metricsConf.get(PRECISION_TABLE_TTL, "86400"); + String hostMinTtl = metricsConf.get(HOST_MINUTE_TABLE_TTL, "604800"); + String hostHourTtl = metricsConf.get(HOST_HOUR_TABLE_TTL, "2592000"); + String clusterMinTtl = metricsConf.get(CLUSTER_MINUTE_TABLE_TTL, "2592000"); + String clusterHourTtl = metricsConf.get(CLUSTER_HOUR_TABLE_TTL, "31536000"); + + try { + LOG.info("Initializing metrics schema..."); + conn = getConnectionRetryingOnException(); + stmt = conn.createStatement(); + + stmt.executeUpdate(String.format(CREATE_METRICS_TABLE_SQL, + encoding, precisionTtl, compression)); + stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL, + encoding, hostHourTtl, compression)); + stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL, + encoding, hostMinTtl, compression)); + stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL, + encoding, clusterMinTtl, compression)); + stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL, + encoding, clusterHourTtl, compression)); + conn.commit(); + } catch (SQLException sql) { + LOG.warn("Error creating Metrics Schema in HBase using Phoenix.", sql); + throw new MetricsInitializationException( + "Error creating Metrics Schema in HBase using Phoenix.", sql); + } catch (InterruptedException e) { + LOG.warn("Error creating Metrics Schema in HBase using Phoenix.", e); + throw new MetricsInitializationException( + "Error creating Metrics Schema in HBase using Phoenix.", e); + } finally { + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException e) { + // Ignore + } + } + if (conn != null) { + try { + conn.close(); + } catch (SQLException e) { + // Ignore + } + } + } + } + + public void insertMetricRecords(TimelineMetrics metrics) + throws SQLException, IOException { + + List<TimelineMetric> timelineMetrics = metrics.getMetrics(); + if (timelineMetrics == null || timelineMetrics.isEmpty()) { + LOG.debug("Empty metrics insert request."); + return; + } + + Connection conn = getConnection(); + PreparedStatement metricRecordStmt = null; + long currentTime = System.currentTimeMillis(); + + try { + metricRecordStmt = conn.prepareStatement(String.format( + UPSERT_METRICS_SQL, METRICS_RECORD_TABLE_NAME)); + + for (TimelineMetric metric : timelineMetrics) { + metricRecordStmt.clearParameters(); + + LOG.trace("host: " + metric.getHostName() + ", " + + "metricName = " + metric.getMetricName() + ", " + + "values: " + metric.getMetricValues()); + Aggregator agg = new Aggregator(); + double[] aggregates = agg.calculateAggregates( + metric.getMetricValues()); + + metricRecordStmt.setString(1, metric.getMetricName()); + metricRecordStmt.setString(2, metric.getHostName()); + metricRecordStmt.setString(3, metric.getAppId()); + metricRecordStmt.setString(4, metric.getInstanceId()); + metricRecordStmt.setLong(5, currentTime); + metricRecordStmt.setLong(6, metric.getStartTime()); + metricRecordStmt.setString(7, metric.getType()); + metricRecordStmt.setDouble(8, aggregates[0]); + metricRecordStmt.setDouble(9, aggregates[1]); + metricRecordStmt.setDouble(10, aggregates[2]); + metricRecordStmt.setLong(11, (long)aggregates[3]); + String json = + TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues()); + metricRecordStmt.setString(12, json); + + try { + metricRecordStmt.executeUpdate(); + } catch (SQLException sql) { + LOG.error(sql); + } + } + + conn.commit(); + + } finally { + if (metricRecordStmt != null) { + try { + metricRecordStmt.close(); + } catch (SQLException e) { + // Ignore + } + } + if (conn != null) { + try { + conn.close(); + } catch (SQLException sql) { + // Ignore + } + } + } + } + + + @SuppressWarnings("unchecked") + public TimelineMetrics getMetricRecords(final Condition condition) + throws SQLException, IOException { + + if (condition.isEmpty()) { + throw new SQLException("No filter criteria specified."); + } + + Connection conn = getConnection(); + PreparedStatement stmt = null; + TimelineMetrics metrics = new TimelineMetrics(); + + try { + stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); + + ResultSet rs = stmt.executeQuery(); + + while (rs.next()) { + TimelineMetric metric = getTimelineMetricFromResultSet(rs); + + if (condition.isGrouped()) { + metrics.addOrMergeTimelineMetric(metric); + } else { + metrics.getMetrics().add(metric); + } + } + + } finally { + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException e) { + // Ignore + } + } + if (conn != null) { + try { + conn.close(); + } catch (SQLException sql) { + // Ignore + } + } + } + return metrics; + } + + public void saveHostAggregateRecords(Map<TimelineMetric, + MetricHostAggregate> hostAggregateMap, String phoenixTableName) + throws SQLException { + + if (hostAggregateMap != null && !hostAggregateMap.isEmpty()) { + Connection conn = getConnection(); + PreparedStatement stmt = null; + + long start = System.currentTimeMillis(); + int rowCount = 0; + + try { + stmt = conn.prepareStatement( + String.format(UPSERT_AGGREGATE_RECORD_SQL, phoenixTableName)); + + for (Map.Entry<TimelineMetric, MetricHostAggregate> metricAggregate : + hostAggregateMap.entrySet()) { + + TimelineMetric metric = metricAggregate.getKey(); + MetricHostAggregate hostAggregate = metricAggregate.getValue(); + + rowCount++; + stmt.clearParameters(); + stmt.setString(1, metric.getMetricName()); + stmt.setString(2, metric.getHostName()); + stmt.setString(3, metric.getAppId()); + stmt.setString(4, metric.getInstanceId()); + stmt.setLong(5, metric.getTimestamp()); + stmt.setString(6, metric.getType()); + stmt.setDouble(7, hostAggregate.getSum()); + stmt.setDouble(8, hostAggregate.getMax()); + stmt.setDouble(9, hostAggregate.getMin()); + stmt.setDouble(10, hostAggregate.getNumberOfSamples()); + + try { + // TODO: Why this exception is swallowed + stmt.executeUpdate(); + } catch (SQLException sql) { + LOG.error(sql); + } + + if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) { + conn.commit(); + rowCount = 0; + } + + } + + conn.commit(); + + } finally { + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException e) { + // Ignore + } + } + if (conn != null) { + try { + conn.close(); + } catch (SQLException sql) { + // Ignore + } + } + } + + long end = System.currentTimeMillis(); + + if ((end - start) > 60000l) { + LOG.info("Time to save map: " + (end - start) + ", " + + "thread = " + Thread.currentThread().getClass()); + } + } + } + + /** + * Save Metric aggregate records. + * + * @throws SQLException + */ + public void saveClusterAggregateRecords( + Map<TimelineClusterMetric, MetricClusterAggregate> records) + throws SQLException { + + if (records == null || records.isEmpty()) { + LOG.debug("Empty aggregate records."); + return; + } + + long start = System.currentTimeMillis(); + + Connection conn = getConnection(); + PreparedStatement stmt = null; + try { + stmt = conn.prepareStatement(UPSERT_CLUSTER_AGGREGATE_SQL); + int rowCount = 0; + + for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate> + aggregateEntry : records.entrySet()) { + TimelineClusterMetric clusterMetric = aggregateEntry.getKey(); + MetricClusterAggregate aggregate = aggregateEntry.getValue(); + + LOG.trace("clusterMetric = " + clusterMetric + ", " + + "aggregate = " + aggregate); + + rowCount++; + stmt.clearParameters(); + stmt.setString(1, clusterMetric.getMetricName()); + stmt.setString(2, clusterMetric.getAppId()); + stmt.setString(3, clusterMetric.getInstanceId()); + stmt.setLong(4, clusterMetric.getTimestamp()); + stmt.setString(5, clusterMetric.getType()); + stmt.setDouble(6, aggregate.getSum()); + stmt.setInt(7, aggregate.getNumberOfHosts()); + stmt.setDouble(8, aggregate.getMax()); + stmt.setDouble(9, aggregate.getMin()); + + try { + stmt.executeUpdate(); + } catch (SQLException sql) { + // TODO: Why this exception is swallowed + LOG.error(sql); + } + + if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) { + conn.commit(); + rowCount = 0; + } + } + + conn.commit(); + + } finally { + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException e) { + // Ignore + } + } + if (conn != null) { + try { + conn.close(); + } catch (SQLException sql) { + // Ignore + } + } + } + long end = System.currentTimeMillis(); + if ((end - start) > 60000l) { + LOG.info("Time to save: " + (end - start) + ", " + + "thread = " + Thread.currentThread().getName()); + } + } + + /** + * Save Metric aggregate records. + * + * @throws SQLException + */ + public void saveClusterAggregateHourlyRecords( + Map<TimelineClusterMetric, MetricHostAggregate> records, + String tableName) + throws SQLException { + if (records == null || records.isEmpty()) { + LOG.debug("Empty aggregate records."); + return; + } + + long start = System.currentTimeMillis(); + + Connection conn = getConnection(); + PreparedStatement stmt = null; + try { + stmt = conn.prepareStatement(String.format + (UPSERT_CLUSTER_AGGREGATE_TIME_SQL, tableName)); + int rowCount = 0; + + for (Map.Entry<TimelineClusterMetric, MetricHostAggregate> + aggregateEntry : records.entrySet()) { + TimelineClusterMetric clusterMetric = aggregateEntry.getKey(); + MetricHostAggregate aggregate = aggregateEntry.getValue(); + + LOG.trace("clusterMetric = " + clusterMetric + ", " + + "aggregate = " + aggregate); + + rowCount++; + stmt.clearParameters(); + stmt.setString(1, clusterMetric.getMetricName()); + stmt.setString(2, clusterMetric.getAppId()); + stmt.setString(3, clusterMetric.getInstanceId()); + stmt.setLong(4, clusterMetric.getTimestamp()); + stmt.setString(5, clusterMetric.getType()); + stmt.setDouble(6, aggregate.getSum()); +// stmt.setInt(7, aggregate.getNumberOfHosts()); + stmt.setLong(7, aggregate.getNumberOfSamples()); + stmt.setDouble(8, aggregate.getMax()); + stmt.setDouble(9, aggregate.getMin()); + + try { + stmt.executeUpdate(); + } catch (SQLException sql) { + // we have no way to verify it works!!! + LOG.error(sql); + } + + if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) { + conn.commit(); + rowCount = 0; + } + } + + conn.commit(); + + } finally { + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException e) { + // Ignore + } + } + if (conn != null) { + try { + conn.close(); + } catch (SQLException sql) { + // Ignore + } + } + } + long end = System.currentTimeMillis(); + if ((end - start) > 60000l) { + LOG.info("Time to save: " + (end - start) + ", " + + "thread = " + Thread.currentThread().getName()); + } + } + + + public TimelineMetrics getAggregateMetricRecords(final Condition condition) + throws SQLException { + + if (condition.isEmpty()) { + throw new SQLException("No filter criteria specified."); + } + + Connection conn = getConnection(); + PreparedStatement stmt = null; + TimelineMetrics metrics = new TimelineMetrics(); + + try { + stmt = PhoenixTransactSQL.prepareGetAggregateSqlStmt(conn, condition); + + ResultSet rs = stmt.executeQuery(); + + while (rs.next()) { + TimelineMetric metric = new TimelineMetric(); + metric.setMetricName(rs.getString("METRIC_NAME")); + metric.setAppId(rs.getString("APP_ID")); + metric.setInstanceId(rs.getString("INSTANCE_ID")); + metric.setTimestamp(rs.getLong("SERVER_TIME")); + metric.setStartTime(rs.getLong("SERVER_TIME")); + Map<Long, Double> valueMap = new HashMap<Long, Double>(); + valueMap.put(rs.getLong("SERVER_TIME"), + rs.getDouble("METRIC_SUM") / rs.getInt("HOSTS_COUNT")); + metric.setMetricValues(valueMap); + + if (condition.isGrouped()) { + metrics.addOrMergeTimelineMetric(metric); + } else { + metrics.getMetrics().add(metric); + } + } + + } finally { + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException e) { + // Ignore + } + } + if (conn != null) { + try { + conn.close(); + } catch (SQLException sql) { + // Ignore + } + } + } + LOG.info("Aggregate records size: " + metrics.getMetrics().size()); + return metrics; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java new file mode 100644 index 0000000..0d53f5f --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java @@ -0,0 +1,528 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; + +/** + * Encapsulate all metrics related SQL queries. + */ +public class PhoenixTransactSQL { + + static final Log LOG = LogFactory.getLog(PhoenixTransactSQL.class); + // TODO: Configurable TTL values + /** + * Create table to store individual metric records. + */ + public static final String CREATE_METRICS_TABLE_SQL = "CREATE TABLE IF NOT " + + "EXISTS METRIC_RECORD (METRIC_NAME VARCHAR, " + + "HOSTNAME VARCHAR, " + + "SERVER_TIME UNSIGNED_LONG NOT NULL, " + + "APP_ID VARCHAR, " + + "INSTANCE_ID VARCHAR, " + + "START_TIME UNSIGNED_LONG, " + + "UNITS CHAR(20), " + + "METRIC_SUM DOUBLE, " + + "METRIC_COUNT UNSIGNED_INT, " + + "METRIC_MAX DOUBLE, " + + "METRIC_MIN DOUBLE, " + + "METRICS VARCHAR CONSTRAINT pk " + + "PRIMARY KEY (METRIC_NAME, HOSTNAME, SERVER_TIME, APP_ID, " + + "INSTANCE_ID)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " + + "TTL=%s, COMPRESSION='%s'"; + + public static final String CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL = + "CREATE TABLE IF NOT EXISTS METRIC_RECORD_HOURLY " + + "(METRIC_NAME VARCHAR, " + + "HOSTNAME VARCHAR, " + + "APP_ID VARCHAR, " + + "INSTANCE_ID VARCHAR, " + + "SERVER_TIME UNSIGNED_LONG NOT NULL, " + + "UNITS CHAR(20), " + + "METRIC_SUM DOUBLE," + + "METRIC_COUNT UNSIGNED_INT, " + + "METRIC_MAX DOUBLE," + + "METRIC_MIN DOUBLE CONSTRAINT pk " + + "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " + + "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " + + "TTL=%s, COMPRESSION='%s'"; + + public static final String CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL = + "CREATE TABLE IF NOT EXISTS METRIC_RECORD_MINUTE " + + "(METRIC_NAME VARCHAR, " + + "HOSTNAME VARCHAR, " + + "APP_ID VARCHAR, " + + "INSTANCE_ID VARCHAR, " + + "SERVER_TIME UNSIGNED_LONG NOT NULL, " + + "UNITS CHAR(20), " + + "METRIC_SUM DOUBLE," + + "METRIC_COUNT UNSIGNED_INT, " + + "METRIC_MAX DOUBLE," + + "METRIC_MIN DOUBLE CONSTRAINT pk " + + "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " + + "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, TTL=%s," + + " COMPRESSION='%s'"; + + public static final String CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL = + "CREATE TABLE IF NOT EXISTS METRIC_AGGREGATE " + + "(METRIC_NAME VARCHAR, " + + "APP_ID VARCHAR, " + + "INSTANCE_ID VARCHAR, " + + "SERVER_TIME UNSIGNED_LONG NOT NULL, " + + "UNITS CHAR(20), " + + "METRIC_SUM DOUBLE, " + + "HOSTS_COUNT UNSIGNED_INT, " + + "METRIC_MAX DOUBLE, " + + "METRIC_MIN DOUBLE " + + "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " + + "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " + + "TTL=%s, COMPRESSION='%s'"; + + public static final String CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL = + "CREATE TABLE IF NOT EXISTS METRIC_AGGREGATE_HOURLY " + + "(METRIC_NAME VARCHAR, " + + "APP_ID VARCHAR, " + + "INSTANCE_ID VARCHAR, " + + "SERVER_TIME UNSIGNED_LONG NOT NULL, " + + "UNITS CHAR(20), " + + "METRIC_SUM DOUBLE, " + + "METRIC_COUNT UNSIGNED_INT, " + + "METRIC_MAX DOUBLE, " + + "METRIC_MIN DOUBLE " + + "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " + + "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " + + "TTL=%s, COMPRESSION='%s'"; + + /** + * Insert into metric records table. + */ + public static final String UPSERT_METRICS_SQL = "UPSERT INTO %s " + + "(METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, START_TIME, " + + "UNITS, " + + "METRIC_SUM, " + + "METRIC_MAX, " + + "METRIC_MIN, " + + "METRIC_COUNT, " + + "METRICS) VALUES " + + "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + + public static final String UPSERT_CLUSTER_AGGREGATE_SQL = "UPSERT INTO " + + "METRIC_AGGREGATE (METRIC_NAME, APP_ID, INSTANCE_ID, SERVER_TIME, " + + "UNITS, " + + "METRIC_SUM, " + + "HOSTS_COUNT, " + + "METRIC_MAX, " + + "METRIC_MIN) " + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; + + public static final String UPSERT_CLUSTER_AGGREGATE_TIME_SQL = "UPSERT INTO" + + " %s (METRIC_NAME, APP_ID, INSTANCE_ID, SERVER_TIME, " + + "UNITS, " + + "METRIC_SUM, " + + "METRIC_COUNT, " + + "METRIC_MAX, " + + "METRIC_MIN) " + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; + + + public static final String UPSERT_AGGREGATE_RECORD_SQL = "UPSERT INTO " + + "%s (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " + + "SERVER_TIME, " + + "UNITS, " + + "METRIC_SUM, " + + "METRIC_MAX, " + + "METRIC_MIN," + + "METRIC_COUNT) " + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + + /** + * Retrieve a set of rows from metrics records table. + */ + public static final String GET_METRIC_SQL = "SELECT %s METRIC_NAME, " + + "HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, START_TIME, UNITS, " + + "METRIC_SUM, " + + "METRIC_MAX, " + + "METRIC_MIN, " + + "METRIC_COUNT, " + + "METRICS " + + "FROM %s"; + + public static final String GET_METRIC_AGGREGATE_ONLY_SQL = "SELECT %s " + + "METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, " + + "UNITS, " + + "METRIC_SUM, " + + "METRIC_MAX, " + + "METRIC_MIN, " + + "METRIC_COUNT " + + "FROM %s"; + + public static final String GET_CLUSTER_AGGREGATE_SQL = "SELECT %s " + + "METRIC_NAME, APP_ID, " + + "INSTANCE_ID, SERVER_TIME, " + + "UNITS, " + + "METRIC_SUM, " + + "HOSTS_COUNT, " + + "METRIC_MAX, " + + "METRIC_MIN " + + "FROM METRIC_AGGREGATE"; + + public static final String METRICS_RECORD_TABLE_NAME = "METRIC_RECORD"; + public static final String METRICS_AGGREGATE_MINUTE_TABLE_NAME = + "METRIC_RECORD_MINUTE"; + public static final String METRICS_AGGREGATE_HOURLY_TABLE_NAME = + "METRIC_RECORD_HOURLY"; + public static final String METRICS_CLUSTER_AGGREGATE_TABLE_NAME = + "METRIC_AGGREGATE"; + public static final String METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME = + "METRIC_AGGREGATE_HOURLY"; + public static final String DEFAULT_TABLE_COMPRESSION = "SNAPPY"; + public static final String DEFAULT_ENCODING = "FAST_DIFF"; + public static final long NATIVE_TIME_RANGE_DELTA = 120000; // 2 minutes + + /** Filter to optimize HBase scan by using file timestamps. This prevents + * a full table scan of metric records. + * @return Phoenix Hint String + */ + public static String getNaiveTimeRangeHint(Long startTime, Long delta) { + return String.format("/*+ NATIVE_TIME_RANGE(%s) */", (startTime - delta)); + } + + public static PreparedStatement prepareGetMetricsSqlStmt( + Connection connection, Condition condition) throws SQLException { + + if (condition.isEmpty()) { + throw new IllegalArgumentException("Condition is empty."); + } + String stmtStr; + if (condition.getStatement() != null) { + stmtStr = condition.getStatement(); + } else { + stmtStr = String.format(GET_METRIC_SQL, + getNaiveTimeRangeHint(condition.getStartTime(), NATIVE_TIME_RANGE_DELTA), + METRICS_RECORD_TABLE_NAME); + } + + StringBuilder sb = new StringBuilder(stmtStr); + sb.append(" WHERE "); + sb.append(condition.getConditionClause()); + String orderByClause = condition.getOrderByClause(); + + if (orderByClause != null) { + sb.append(orderByClause); + } else { + sb.append(" ORDER BY METRIC_NAME, SERVER_TIME "); + } + if (condition.getLimit() != null) { + sb.append(" LIMIT ").append(condition.getLimit()); + } + + LOG.debug("SQL: " + sb.toString() + ", condition: " + condition); + PreparedStatement stmt = connection.prepareStatement(sb.toString()); + int pos = 1; + if (condition.getMetricNames() != null) { + for (; pos <= condition.getMetricNames().size(); pos++) { + LOG.debug("Setting pos: " + pos + ", value = " + condition.getMetricNames().get(pos - 1)); + stmt.setString(pos, condition.getMetricNames().get(pos - 1)); + } + } + if (condition.getHostname() != null) { + LOG.debug("Setting pos: " + pos + ", value: " + condition.getHostname()); + stmt.setString(pos++, condition.getHostname()); + } + // TODO: Upper case all strings on POST + if (condition.getAppId() != null) { + // TODO: fix case of appId coming from host metrics + String appId = condition.getAppId(); + if (!condition.getAppId().equals("HOST")) { + appId = appId.toLowerCase(); + } + LOG.debug("Setting pos: " + pos + ", value: " + appId); + stmt.setString(pos++, appId); + } + if (condition.getInstanceId() != null) { + LOG.debug("Setting pos: " + pos + ", value: " + condition.getInstanceId()); + stmt.setString(pos++, condition.getInstanceId()); + } + if (condition.getStartTime() != null) { + LOG.debug("Setting pos: " + pos + ", value: " + condition.getStartTime()); + stmt.setLong(pos++, condition.getStartTime()); + } + if (condition.getEndTime() != null) { + LOG.debug("Setting pos: " + pos + ", value: " + condition.getEndTime()); + stmt.setLong(pos, condition.getEndTime()); + } + if (condition.getFetchSize() != null) { + stmt.setFetchSize(condition.getFetchSize()); + } + + return stmt; + } + + + public static PreparedStatement prepareGetAggregateSqlStmt( + Connection connection, Condition condition) throws SQLException { + + if (condition.isEmpty()) { + throw new IllegalArgumentException("Condition is empty."); + } + + StringBuilder sb = new StringBuilder(GET_CLUSTER_AGGREGATE_SQL); + sb.append(" WHERE "); + sb.append(condition.getConditionClause()); + sb.append(" ORDER BY METRIC_NAME, SERVER_TIME"); + if (condition.getLimit() != null) { + sb.append(" LIMIT ").append(condition.getLimit()); + } + + LOG.debug("SQL => " + sb.toString() + ", condition => " + condition); + PreparedStatement stmt = connection.prepareStatement(sb.toString()); + int pos = 1; + if (condition.getMetricNames() != null) { + for (; pos <= condition.getMetricNames().size(); pos++) { + stmt.setString(pos, condition.getMetricNames().get(pos - 1)); + } + } + // TODO: Upper case all strings on POST + if (condition.getAppId() != null) { + stmt.setString(pos++, condition.getAppId().toLowerCase()); + } + if (condition.getInstanceId() != null) { + stmt.setString(pos++, condition.getInstanceId()); + } + if (condition.getStartTime() != null) { + stmt.setLong(pos++, condition.getStartTime()); + } + if (condition.getEndTime() != null) { + stmt.setLong(pos, condition.getEndTime()); + } + + return stmt; + } + + static class Condition { + List<String> metricNames; + String hostname; + String appId; + String instanceId; + Long startTime; + Long endTime; + Integer limit; + boolean grouped; + boolean noLimit = false; + Integer fetchSize; + String statement; + Set<String> orderByColumns = new LinkedHashSet<String>(); + + Condition(List<String> metricNames, String hostname, String appId, + String instanceId, Long startTime, Long endTime, Integer limit, + boolean grouped) { + this.metricNames = metricNames; + this.hostname = hostname; + this.appId = appId; + this.instanceId = instanceId; + this.startTime = startTime; + this.endTime = endTime; + this.limit = limit; + this.grouped = grouped; + } + + String getStatement() { + return statement; + } + + void setStatement(String statement) { + this.statement = statement; + } + + List<String> getMetricNames() { + return metricNames == null || metricNames.isEmpty() ? null : metricNames; + } + + String getMetricsClause() { + StringBuilder sb = new StringBuilder("("); + if (metricNames != null) { + for (String name : metricNames) { + if (sb.length() != 1) { + sb.append(", "); + } + sb.append("?"); + } + sb.append(")"); + return sb.toString(); + } else { + return null; + } + } + + String getConditionClause() { + StringBuilder sb = new StringBuilder(); + boolean appendConjunction = false; + + if (getMetricNames() != null) { + sb.append("METRIC_NAME IN "); + sb.append(getMetricsClause()); + appendConjunction = true; + } + if (appendConjunction) { + sb.append(" AND"); + } + appendConjunction = false; + if (getHostname() != null) { + sb.append(" HOSTNAME = ?"); + appendConjunction = true; + } + if (appendConjunction) { + sb.append(" AND"); + } + appendConjunction = false; + if (getAppId() != null) { + sb.append(" APP_ID = ?"); + appendConjunction = true; + } + if (appendConjunction) { + sb.append(" AND"); + } + appendConjunction = false; + if (getInstanceId() != null) { + sb.append(" INSTANCE_ID = ?"); + appendConjunction = true; + } + if (appendConjunction) { + sb.append(" AND"); + } + appendConjunction = false; + if (getStartTime() != null) { + sb.append(" SERVER_TIME >= ?"); + appendConjunction = true; + } + if (appendConjunction) { + sb.append(" AND"); + } + if (getEndTime() != null) { + sb.append(" SERVER_TIME < ?"); + } + return sb.toString(); + } + + String getHostname() { + return hostname == null || hostname.isEmpty() ? null : hostname; + } + + String getAppId() { + return appId == null || appId.isEmpty() ? null : appId; + } + + String getInstanceId() { + return instanceId == null || instanceId.isEmpty() ? null : instanceId; + } + + /** + * Convert to millis. + */ + Long getStartTime() { + if (startTime < 9999999999l) { + return startTime * 1000; + } else { + return startTime; + } + } + + Long getEndTime() { + if (endTime < 9999999999l) { + return endTime * 1000; + } else { + return endTime; + } + } + + void setNoLimit() { + this.noLimit = true; + } + + Integer getLimit() { + if (noLimit) { + return null; + } + return limit == null ? PhoenixHBaseAccessor.RESULTSET_LIMIT : limit; + } + + boolean isGrouped() { + return grouped; + } + + boolean isEmpty() { + return (metricNames == null || metricNames.isEmpty()) + && (hostname == null || hostname.isEmpty()) + && (appId == null || appId.isEmpty()) + && (instanceId == null || instanceId.isEmpty()) + && startTime == null + && endTime == null; + } + + Integer getFetchSize() { + return fetchSize; + } + + void setFetchSize(Integer fetchSize) { + this.fetchSize = fetchSize; + } + + void addOrderByColumn(String column) { + orderByColumns.add(column); + } + + String getOrderByClause() { + String orderByStr = " ORDER BY "; + if (!orderByColumns.isEmpty()) { + StringBuilder sb = new StringBuilder(orderByStr); + for (String orderByColumn : orderByColumns) { + if (sb.length() != orderByStr.length()) { + sb.append(", "); + } + sb.append(orderByColumn); + } + sb.append(" "); + return sb.toString(); + } + return null; + } + + @Override + public String toString() { + return "Condition{" + + "metricNames=" + metricNames + + ", hostname='" + hostname + '\'' + + ", appId='" + appId + '\'' + + ", instanceId='" + instanceId + '\'' + + ", startTime=" + startTime + + ", endTime=" + endTime + + ", limit=" + limit + + ", grouped=" + grouped + + ", orderBy=" + orderByColumns + + ", noLimit=" + noLimit + + '}'; + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java new file mode 100644 index 0000000..d227993 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; + +public class TimelineClusterMetric { + private String metricName; + private String appId; + private String instanceId; + private long timestamp; + private String type; + + TimelineClusterMetric(String metricName, String appId, String instanceId, + long timestamp, String type) { + this.metricName = metricName; + this.appId = appId; + this.instanceId = instanceId; + this.timestamp = timestamp; + this.type = type; + } + + String getMetricName() { + return metricName; + } + + String getAppId() { + return appId; + } + + String getInstanceId() { + return instanceId; + } + + long getTimestamp() { + return timestamp; + } + + String getType() { return type; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TimelineClusterMetric that = (TimelineClusterMetric) o; + + if (timestamp != that.timestamp) return false; + if (appId != null ? !appId.equals(that.appId) : that.appId != null) + return false; + if (instanceId != null ? !instanceId.equals(that.instanceId) : that.instanceId != null) + return false; + if (!metricName.equals(that.metricName)) return false; + + return true; + } + + public boolean equalsExceptTime(TimelineClusterMetric metric) { + if (!metricName.equals(metric.metricName)) return false; + if (!appId.equals(metric.appId)) return false; + if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null) + return false; + + return true; + } + @Override + public int hashCode() { + int result = metricName.hashCode(); + result = 31 * result + (appId != null ? appId.hashCode() : 0); + result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0); + result = 31 * result + (int) (timestamp ^ (timestamp >>> 32)); + return result; + } + + @Override + public String toString() { + return "TimelineClusterMetric{" + + "metricName='" + metricName + '\'' + + ", appId='" + appId + '\'' + + ", instanceId='" + instanceId + '\'' + + ", timestamp=" + timestamp + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java new file mode 100644 index 0000000..cab154b --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import java.io.IOException; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL; + +public class TimelineMetricAggregator extends AbstractTimelineAggregator { + private static final Log LOG = LogFactory.getLog + (TimelineMetricAggregator.class); + + private final String checkpointLocation; + private final Long sleepIntervalMillis; + private final Integer checkpointCutOffMultiplier; + private final String hostAggregatorDisabledParam; + private final String tableName; + private final String outputTableName; + private final Long nativeTimeRangeDelay; + + public TimelineMetricAggregator(PhoenixHBaseAccessor hBaseAccessor, + Configuration metricsConf, + String checkpointLocation, + Long sleepIntervalMillis, + Integer checkpointCutOffMultiplier, + String hostAggregatorDisabledParam, + String tableName, + String outputTableName, + Long nativeTimeRangeDelay) { + super(hBaseAccessor, metricsConf); + this.checkpointLocation = checkpointLocation; + this.sleepIntervalMillis = sleepIntervalMillis; + this.checkpointCutOffMultiplier = checkpointCutOffMultiplier; + this.hostAggregatorDisabledParam = hostAggregatorDisabledParam; + this.tableName = tableName; + this.outputTableName = outputTableName; + this.nativeTimeRangeDelay = nativeTimeRangeDelay; + } + + @Override + protected String getCheckpointLocation() { + return checkpointLocation; + } + + @Override + protected void aggregate(ResultSet rs, long startTime, long endTime) + throws IOException, SQLException { + Map<TimelineMetric, MetricHostAggregate> hostAggregateMap = + aggregateMetricsFromResultSet(rs); + + LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates."); + hBaseAccessor.saveHostAggregateRecords(hostAggregateMap, + outputTableName); + } + + @Override + protected Condition prepareMetricQueryCondition(long startTime, long endTime) { + Condition condition = new Condition(null, null, null, null, startTime, + endTime, null, true); + condition.setNoLimit(); + condition.setFetchSize(resultsetFetchSize); + condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL, + PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, nativeTimeRangeDelay), + tableName)); + condition.addOrderByColumn("METRIC_NAME"); + condition.addOrderByColumn("HOSTNAME"); + condition.addOrderByColumn("APP_ID"); + condition.addOrderByColumn("INSTANCE_ID"); + condition.addOrderByColumn("SERVER_TIME"); + return condition; + } + + private Map<TimelineMetric, MetricHostAggregate> aggregateMetricsFromResultSet + (ResultSet rs) throws IOException, SQLException { + TimelineMetric existingMetric = null; + MetricHostAggregate hostAggregate = null; + Map<TimelineMetric, MetricHostAggregate> hostAggregateMap = + new HashMap<TimelineMetric, MetricHostAggregate>(); + + while (rs.next()) { + TimelineMetric currentMetric = + PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs); + MetricHostAggregate currentHostAggregate = + PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs); + + if (existingMetric == null) { + // First row + existingMetric = currentMetric; + hostAggregate = new MetricHostAggregate(); + hostAggregateMap.put(currentMetric, hostAggregate); + } + + if (existingMetric.equalsExceptTime(currentMetric)) { + // Recalculate totals with current metric + hostAggregate.updateAggregates(currentHostAggregate); + } else { + // Switched over to a new metric - save existing - create new aggregate + hostAggregate = new MetricHostAggregate(); + hostAggregate.updateAggregates(currentHostAggregate); + hostAggregateMap.put(currentMetric, hostAggregate); + existingMetric = currentMetric; + } + } + return hostAggregateMap; + } + + @Override + protected Long getSleepIntervalMillis() { + return sleepIntervalMillis; + } + + @Override + protected Integer getCheckpointCutOffMultiplier() { + return checkpointCutOffMultiplier; + } + + @Override + protected boolean isDisabled() { + return metricsConf.getBoolean(hostAggregatorDisabledParam, false); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java new file mode 100644 index 0000000..8b10079 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; + +import static java.util.concurrent.TimeUnit.SECONDS; +import org.apache.commons.io.FilenameUtils; +import org.apache.hadoop.conf.Configuration; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_DISABLED; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_DISABLED; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR; + +/** + * + */ +public class TimelineMetricAggregatorFactory { + private static final String MINUTE_AGGREGATE_CHECKPOINT_FILE = + "timeline-metrics-host-aggregator-checkpoint"; + private static final String MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE = + "timeline-metrics-host-aggregator-hourly-checkpoint"; + + public static TimelineMetricAggregator createTimelineMetricAggregatorMinute + (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { + + String checkpointDir = metricsConf.get( + TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); + String checkpointLocation = FilenameUtils.concat(checkpointDir, + MINUTE_AGGREGATE_CHECKPOINT_FILE); + long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong + (HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300l)); // 5 mins + + int checkpointCutOffMultiplier = metricsConf.getInt + (HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 3); + String hostAggregatorDisabledParam = HOST_AGGREGATOR_MINUTE_DISABLED; + + String inputTableName = METRICS_RECORD_TABLE_NAME; + String outputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME; + + return new TimelineMetricAggregator(hBaseAccessor, metricsConf, + checkpointLocation, + sleepIntervalMillis, + checkpointCutOffMultiplier, + hostAggregatorDisabledParam, + inputTableName, + outputTableName, + 120000l); + } + + public static TimelineMetricAggregator createTimelineMetricAggregatorHourly + (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { + + String checkpointDir = metricsConf.get( + TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); + String checkpointLocation = FilenameUtils.concat(checkpointDir, + MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE); + long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong + (HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l)); + + int checkpointCutOffMultiplier = metricsConf.getInt + (HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2); + String hostAggregatorDisabledParam = HOST_AGGREGATOR_HOUR_DISABLED; + + String inputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME; + String outputTableName = METRICS_AGGREGATE_HOURLY_TABLE_NAME; + + return new TimelineMetricAggregator(hBaseAccessor, metricsConf, + checkpointLocation, + sleepIntervalMillis, + checkpointCutOffMultiplier, + hostAggregatorDisabledParam, + inputTableName, + outputTableName, + 3600000l); + } + + +}