AMBARI-10290. Expose avaialble host metrics across hostcomponents. (swagle)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/b93452ed Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/b93452ed Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/b93452ed Branch: refs/heads/trunk Commit: b93452edab3d93a7217751192145eab3944876c1 Parents: 81f311b Author: Siddharth Wagle <swa...@hortonworks.com> Authored: Tue Mar 31 16:07:12 2015 -0700 Committer: Siddharth Wagle <swa...@hortonworks.com> Committed: Thu Apr 2 14:55:29 2015 -0700 ---------------------------------------------------------------------- .../timeline/AbstractTimelineAggregator.java | 271 ------ .../metrics/timeline/AggregatorUtils.java | 59 -- .../metrics/timeline/ConnectionProvider.java | 29 - .../timeline/DefaultPhoenixDataSource.java | 77 -- .../metrics/timeline/Function.java | 169 ---- .../timeline/HBaseTimelineMetricStore.java | 10 +- .../metrics/timeline/MetricAggregate.java | 110 --- .../timeline/MetricClusterAggregate.java | 74 -- .../metrics/timeline/MetricHostAggregate.java | 81 -- .../metrics/timeline/PhoenixHBaseAccessor.java | 61 +- .../metrics/timeline/PhoenixTransactSQL.java | 970 ------------------- .../metrics/timeline/TimelineClusterMetric.java | 97 -- .../timeline/TimelineClusterMetricReader.java | 42 - .../timeline/TimelineMetricAggregator.java | 145 --- .../TimelineMetricAggregatorFactory.java | 99 -- .../TimelineMetricClusterAggregator.java | 223 ----- .../TimelineMetricClusterAggregatorHourly.java | 177 ---- .../timeline/TimelineMetricConfiguration.java | 6 + .../metrics/timeline/TimelineMetricReader.java | 65 -- .../aggregators/AbstractTimelineAggregator.java | 270 ++++++ .../timeline/aggregators/AggregatorUtils.java | 59 ++ .../metrics/timeline/aggregators/Function.java | 169 ++++ .../timeline/aggregators/MetricAggregate.java | 110 +++ .../aggregators/MetricClusterAggregate.java | 73 ++ .../aggregators/MetricHostAggregate.java | 81 ++ .../aggregators/TimelineClusterMetric.java | 97 ++ .../TimelineClusterMetricReader.java | 42 + .../aggregators/TimelineMetricAggregator.java | 147 +++ .../TimelineMetricAggregatorFactory.java | 98 ++ .../TimelineMetricAppAggregator.java | 169 ++++ .../TimelineMetricClusterAggregator.java | 235 +++++ .../TimelineMetricClusterAggregatorHourly.java | 175 ++++ .../aggregators/TimelineMetricReadHelper.java | 66 ++ .../metrics/timeline/query/Condition.java | 46 + .../timeline/query/ConnectionProvider.java | 29 + .../timeline/query/DefaultCondition.java | 258 +++++ .../query/DefaultPhoenixDataSource.java | 77 ++ .../timeline/query/PhoenixTransactSQL.java | 573 +++++++++++ .../query/SplitByMetricNamesCondition.java | 165 ++++ .../TestApplicationHistoryServer.java | 3 +- .../timeline/AbstractMiniHBaseClusterTest.java | 3 +- .../AbstractTimelineAggregatorTest.java | 9 +- .../metrics/timeline/FunctionTest.java | 7 +- .../timeline/HBaseTimelineMetricStoreTest.java | 5 +- .../metrics/timeline/ITClusterAggregator.java | 78 +- .../metrics/timeline/ITMetricAggregator.java | 18 +- .../timeline/ITPhoenixHBaseAccessor.java | 23 +- .../metrics/timeline/MetricTestHelper.java | 20 +- .../timeline/TestMetricHostAggregate.java | 1 + .../timeline/TestPhoenixTransactSQL.java | 11 +- .../0.1.0/configuration/ams-site.xml | 8 + 51 files changed, 3130 insertions(+), 2760 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java deleted file mode 100644 index 4af3db7..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java +++ /dev/null @@ -1,271 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; - -import org.apache.commons.io.FileUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.util.Clock; -import org.apache.hadoop.yarn.util.SystemClock; - -import java.io.File; -import java.io.IOException; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.Date; - -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATOR_CHECKPOINT_DELAY; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.RESULTSET_FETCH_SIZE; - -public abstract class AbstractTimelineAggregator implements Runnable { - protected final PhoenixHBaseAccessor hBaseAccessor; - private final Log LOG; - - private Clock clock; - protected final long checkpointDelayMillis; - protected final Integer resultsetFetchSize; - protected Configuration metricsConf; - - public AbstractTimelineAggregator(PhoenixHBaseAccessor hBaseAccessor, - Configuration metricsConf) { - this(hBaseAccessor, metricsConf, new SystemClock()); - } - - public AbstractTimelineAggregator(PhoenixHBaseAccessor hBaseAccessor, - Configuration metricsConf, Clock clk) { - this.hBaseAccessor = hBaseAccessor; - this.metricsConf = metricsConf; - this.checkpointDelayMillis = SECONDS.toMillis( - metricsConf.getInt(AGGREGATOR_CHECKPOINT_DELAY, 120)); - this.resultsetFetchSize = metricsConf.getInt(RESULTSET_FETCH_SIZE, 2000); - this.LOG = LogFactory.getLog(this.getClass()); - this.clock = clk; - } - - @Override - public void run() { - LOG.info("Started Timeline aggregator thread @ " + new Date()); - Long SLEEP_INTERVAL = getSleepIntervalMillis(); - - while (true) { - long sleepTime = runOnce(SLEEP_INTERVAL); - - try { - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - LOG.info("Sleep interrupted, continuing with aggregation."); - } - } - } - - /** - * Access relaxed for tests - */ - protected long runOnce(Long SLEEP_INTERVAL) { - long currentTime = clock.getTime(); - long lastCheckPointTime = readLastCheckpointSavingOnFirstRun(currentTime); - long sleepTime = SLEEP_INTERVAL; - - if (lastCheckPointTime != -1) { - LOG.info("Last check point time: " + lastCheckPointTime + ", lagBy: " - + ((clock.getTime() - lastCheckPointTime) / 1000) - + " seconds."); - - long startTime = clock.getTime(); - boolean success = doWork(lastCheckPointTime, - lastCheckPointTime + SLEEP_INTERVAL); - long executionTime = clock.getTime() - startTime; - long delta = SLEEP_INTERVAL - executionTime; - - if (delta > 0) { - // Sleep for (configured sleep - time to execute task) - sleepTime = delta; - } else { - // No sleep because last run took too long to execute - LOG.info("Aggregator execution took too long, " + - "cancelling sleep. executionTime = " + executionTime); - sleepTime = 1; - } - - LOG.debug("Aggregator sleep interval = " + sleepTime); - - if (success) { - try { - // Comment to bug fix: - // cannot just save lastCheckPointTime + SLEEP_INTERVAL, - // it has to be verified so it is not a time in the future - // checkpoint says what was aggregated, and there is no way - // the future metrics were aggregated! - saveCheckPoint(Math.min(currentTime, lastCheckPointTime + - SLEEP_INTERVAL)); - } catch (IOException io) { - LOG.warn("Error saving checkpoint, restarting aggregation at " + - "previous checkpoint."); - } - } - } - - return sleepTime; - } - - private long readLastCheckpointSavingOnFirstRun(long currentTime) { - long lastCheckPointTime = -1; - - try { - lastCheckPointTime = readCheckPoint(); - if (isLastCheckPointTooOld(lastCheckPointTime)) { - LOG.warn("Last Checkpoint is too old, discarding last checkpoint. " + - "lastCheckPointTime = " + lastCheckPointTime); - lastCheckPointTime = -1; - } - if (lastCheckPointTime == -1) { - // Assuming first run, save checkpoint and sleep. - // Set checkpoint to 2 minutes in the past to allow the - // agents/collectors to catch up - LOG.info("Saving checkpoint time on first run." + - (currentTime - checkpointDelayMillis)); - saveCheckPoint(currentTime - checkpointDelayMillis); - } - } catch (IOException io) { - LOG.warn("Unable to write last checkpoint time. Resuming sleep.", io); - } - return lastCheckPointTime; - } - - private boolean isLastCheckPointTooOld(long checkpoint) { - // first checkpoint is saved checkpointDelayMillis in the past, - // so here we also need to take it into account - return checkpoint != -1 && - ((clock.getTime() - checkpoint - checkpointDelayMillis) > - getCheckpointCutOffIntervalMillis()); - } - - protected long readCheckPoint() { - try { - File checkpoint = new File(getCheckpointLocation()); - if (checkpoint.exists()) { - String contents = FileUtils.readFileToString(checkpoint); - if (contents != null && !contents.isEmpty()) { - return Long.parseLong(contents); - } - } - } catch (IOException io) { - LOG.debug(io); - } - return -1; - } - - protected void saveCheckPoint(long checkpointTime) throws IOException { - File checkpoint = new File(getCheckpointLocation()); - if (!checkpoint.exists()) { - boolean done = checkpoint.createNewFile(); - if (!done) { - throw new IOException("Could not create checkpoint at location, " + - getCheckpointLocation()); - } - } - FileUtils.writeStringToFile(checkpoint, String.valueOf(checkpointTime)); - } - - /** - * Read metrics written during the time interval and save the sum and total - * in the aggregate table. - * - * @param startTime Sample start time - * @param endTime Sample end time - */ - protected boolean doWork(long startTime, long endTime) { - LOG.info("Start aggregation cycle @ " + new Date() + ", " + - "startTime = " + new Date(startTime) + ", endTime = " + new Date(endTime)); - - boolean success = true; - PhoenixTransactSQL.Condition condition = - prepareMetricQueryCondition(startTime, endTime); - - Connection conn = null; - PreparedStatement stmt = null; - ResultSet rs = null; - - try { - conn = hBaseAccessor.getConnection(); - // FLUME 2. aggregate and ignore the instance - stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); - - LOG.debug("Query issued @: " + new Date()); - rs = stmt.executeQuery(); - LOG.debug("Query returned @: " + new Date()); - - aggregate(rs, startTime, endTime); - LOG.info("End aggregation cycle @ " + new Date()); - - } catch (SQLException e) { - LOG.error("Exception during aggregating metrics.", e); - success = false; - } catch (IOException e) { - LOG.error("Exception during aggregating metrics.", e); - success = false; - } finally { - if (rs != null) { - try { - rs.close(); - } catch (SQLException e) { - // Ignore - } - } - if (stmt != null) { - try { - stmt.close(); - } catch (SQLException e) { - // Ignore - } - } - if (conn != null) { - try { - conn.close(); - } catch (SQLException sql) { - // Ignore - } - } - } - - LOG.info("End aggregation cycle @ " + new Date()); - return success; - } - - protected abstract PhoenixTransactSQL.Condition - prepareMetricQueryCondition(long startTime, long endTime); - - protected abstract void aggregate(ResultSet rs, long startTime, long endTime) - throws IOException, SQLException; - - protected abstract Long getSleepIntervalMillis(); - - protected abstract Integer getCheckpointCutOffMultiplier(); - - protected Long getCheckpointCutOffIntervalMillis() { - return getCheckpointCutOffMultiplier() * getSleepIntervalMillis(); - } - - protected abstract boolean isDisabled(); - - protected abstract String getCheckpointLocation(); -} http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AggregatorUtils.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AggregatorUtils.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AggregatorUtils.java deleted file mode 100644 index fbea248..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AggregatorUtils.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; - - -import java.util.Map; - -/** - * - */ -public class AggregatorUtils { - - public static double[] calculateAggregates(Map<Long, Double> metricValues) { - double[] values = new double[4]; - double max = Double.MIN_VALUE; - double min = Double.MAX_VALUE; - double sum = 0.0; - int metricCount = 0; - - if (metricValues != null && !metricValues.isEmpty()) { - for (Double value : metricValues.values()) { - // TODO: Some nulls in data - need to investigate null values from host - if (value != null) { - if (value > max) { - max = value; - } - if (value < min) { - min = value; - } - sum += value; - } - } - metricCount = metricValues.values().size(); - } - // BR: WHY ZERO is a good idea? - values[0] = sum; - values[1] = max != Double.MIN_VALUE ? max : 0.0; - values[2] = min != Double.MAX_VALUE ? min : 0.0; - values[3] = metricCount; - - return values; - } - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ConnectionProvider.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ConnectionProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ConnectionProvider.java deleted file mode 100644 index 34da78b..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ConnectionProvider.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; - - -import java.sql.Connection; -import java.sql.SQLException; - -/** - * - */ -public interface ConnectionProvider { - public Connection getConnection() throws SQLException; -} http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java deleted file mode 100644 index 47db730..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; - - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; - -public class DefaultPhoenixDataSource implements ConnectionProvider { - - static final Log LOG = LogFactory.getLog(DefaultPhoenixDataSource.class); - private static final String ZOOKEEPER_CLIENT_PORT = - "hbase.zookeeper.property.clientPort"; - private static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum"; - private static final String ZNODE_PARENT = "zookeeper.znode.parent"; - - private static final String connectionUrl = "jdbc:phoenix:%s:%s:%s"; - private final String url; - - public DefaultPhoenixDataSource(Configuration hbaseConf) { - String zookeeperClientPort = hbaseConf.getTrimmed(ZOOKEEPER_CLIENT_PORT, - "2181"); - String zookeeperQuorum = hbaseConf.getTrimmed(ZOOKEEPER_QUORUM); - String znodeParent = hbaseConf.getTrimmed(ZNODE_PARENT, "/hbase"); - if (zookeeperQuorum == null || zookeeperQuorum.isEmpty()) { - throw new IllegalStateException("Unable to find Zookeeper quorum to " + - "access HBase store using Phoenix."); - } - - url = String.format(connectionUrl, - zookeeperQuorum, - zookeeperClientPort, - znodeParent); - } - - /** - * Get JDBC connection to HBase store. Assumption is that the hbase - * configuration is present on the classpath and loaded by the caller into - * the Configuration object. - * Phoenix already caches the HConnection between the client and HBase - * cluster. - * - * @return @java.sql.Connection - */ - public Connection getConnection() throws SQLException { - - LOG.debug("Metric store connection url: " + url); - try { - return DriverManager.getConnection(url); - } catch (SQLException e) { - LOG.warn("Unable to connect to HBase store using Phoenix.", e); - - throw e; - } - } - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/Function.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/Function.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/Function.java deleted file mode 100644 index 11245d8..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/Function.java +++ /dev/null @@ -1,169 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; - -/** - * Is used to determine metrics aggregate table. - * - * @see org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.TimelineWebServices#getTimelineMetric - * @see org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.TimelineWebServices#getTimelineMetrics - */ -public class Function { - public static Function DEFAULT_VALUE_FUNCTION = - new Function(ReadFunction.VALUE, null); - private static final String SUFFIX_SEPARATOR = "\\._"; - - private ReadFunction readFunction = ReadFunction.VALUE; - private PostProcessingFunction postProcessingFunction = null; - - public Function(){ - - } - - public Function(ReadFunction readFunction, - PostProcessingFunction ppFunction){ - if (readFunction!=null){ - this.readFunction = readFunction ; - } - this.postProcessingFunction = ppFunction; - } - - public static Function fromMetricName(String metricName){ - // gets postprocessing, and aggregation function - // ex. Metric._rate._avg - String[] parts = metricName.split(SUFFIX_SEPARATOR); - - ReadFunction readFunction = ReadFunction.VALUE; - PostProcessingFunction ppFunction = null; - - if (parts.length == 3) { - ppFunction = PostProcessingFunction.getFunction(parts[1]); - readFunction = ReadFunction.getFunction(parts[2]); - } else if (parts.length == 2) { - ppFunction = null; - readFunction = ReadFunction.getFunction(parts[1]); - } - - - return new Function(readFunction, ppFunction); - } - - public String getSuffix(){ - return (postProcessingFunction == null)? readFunction.getSuffix() : - postProcessingFunction.getSuffix() + readFunction.getSuffix(); - } - - public ReadFunction getReadFunction() { - return readFunction; - } - - @Override - public String toString() { - return "Function{" + - "readFunction=" + readFunction + - ", postProcessingFunction=" + postProcessingFunction + - '}'; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (!(o instanceof Function)) return false; - - Function function = (Function) o; - - return postProcessingFunction == function.postProcessingFunction - && readFunction == function.readFunction; - - } - - @Override - public int hashCode() { - int result = readFunction.hashCode(); - result = 31 * result + (postProcessingFunction != null ? - postProcessingFunction.hashCode() : 0); - return result; - } - - public enum PostProcessingFunction { - NONE(""), - RATE("._rate"); - - PostProcessingFunction(String suffix){ - this.suffix = suffix; - } - - private String suffix = ""; - - public String getSuffix(){ - return suffix; - } - - public static PostProcessingFunction getFunction(String functionName) throws - FunctionFormatException { - if (functionName == null) { - return NONE; - } - - try { - return PostProcessingFunction.valueOf(functionName.toUpperCase()); - } catch (IllegalArgumentException e) { - throw new FunctionFormatException("Function should be value, avg, min, " + - "max", e); - } - } - } - - public enum ReadFunction { - VALUE(""), - AVG("._avg"), - MIN("._min"), - MAX("._max"), - SUM("._sum"); - - private final String suffix; - - ReadFunction(String suffix){ - this.suffix = suffix; - } - - public String getSuffix() { - return suffix; - } - - public static ReadFunction getFunction(String functionName) throws - FunctionFormatException { - if (functionName == null) { - return VALUE; - } - try { - return ReadFunction.valueOf(functionName.toUpperCase()); - } catch (IllegalArgumentException e) { - throw new FunctionFormatException( - "Function should be value, avg, min, max. Got " + functionName, e); - } - } - } - - public static class FunctionFormatException extends IllegalArgumentException { - public FunctionFormatException(String message, Throwable cause) { - super(message, cause); - } - } - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java index a4980b4..1fac404 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java @@ -25,6 +25,14 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricClusterAggregator; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricClusterAggregatorHourly; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition; + import java.io.IOException; import java.sql.SQLException; import java.util.ArrayList; @@ -33,8 +41,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.TreeMap; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DefaultCondition; public class HBaseTimelineMetricStore extends AbstractService implements TimelineMetricStore { http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java deleted file mode 100644 index 61e15d7..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java +++ /dev/null @@ -1,110 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; - - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.codehaus.jackson.annotate.JsonProperty; -import org.codehaus.jackson.annotate.JsonSubTypes; -import org.codehaus.jackson.map.ObjectMapper; - -import java.io.IOException; - -/** -* -*/ -@JsonSubTypes({@JsonSubTypes.Type(value = MetricClusterAggregate.class), - @JsonSubTypes.Type(value = MetricHostAggregate.class)}) -@InterfaceAudience.Public -@InterfaceStability.Unstable -public class MetricAggregate { - private static final ObjectMapper mapper = new ObjectMapper(); - - protected Double sum = 0.0; - protected Double deviation; - protected Double max = Double.MIN_VALUE; - protected Double min = Double.MAX_VALUE; - - public MetricAggregate() { - } - - MetricAggregate(Double sum, Double deviation, Double max, - Double min) { - this.sum = sum; - this.deviation = deviation; - this.max = max; - this.min = min; - } - - void updateSum(Double sum) { - this.sum += sum; - } - - void updateMax(Double max) { - if (max > this.max) { - this.max = max; - } - } - - void updateMin(Double min) { - if (min < this.min) { - this.min = min; - } - } - - @JsonProperty("sum") - Double getSum() { - return sum; - } - - @JsonProperty("deviation") - Double getDeviation() { - return deviation; - } - - @JsonProperty("max") - Double getMax() { - return max; - } - - @JsonProperty("min") - Double getMin() { - return min; - } - - public void setSum(Double sum) { - this.sum = sum; - } - - public void setDeviation(Double deviation) { - this.deviation = deviation; - } - - public void setMax(Double max) { - this.max = max; - } - - public void setMin(Double min) { - this.min = min; - } - - public String toJSON() throws IOException { - return mapper.writeValueAsString(this); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java deleted file mode 100644 index c13c85f..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; - - -import org.codehaus.jackson.annotate.JsonCreator; -import org.codehaus.jackson.annotate.JsonProperty; - -/** -* -*/ -public class MetricClusterAggregate extends MetricAggregate { - private int numberOfHosts; - - @JsonCreator - public MetricClusterAggregate() { - } - - MetricClusterAggregate(Double sum, int numberOfHosts, Double deviation, - Double max, Double min) { - super(sum, deviation, max, min); - this.numberOfHosts = numberOfHosts; - } - - @JsonProperty("numberOfHosts") - int getNumberOfHosts() { - return numberOfHosts; - } - - void updateNumberOfHosts(int count) { - this.numberOfHosts += count; - } - - public void setNumberOfHosts(int numberOfHosts) { - this.numberOfHosts = numberOfHosts; - } - - /** - * Find and update min, max and avg for a minute - */ - void updateAggregates(MetricClusterAggregate hostAggregate) { - updateMax(hostAggregate.getMax()); - updateMin(hostAggregate.getMin()); - updateSum(hostAggregate.getSum()); - updateNumberOfHosts(hostAggregate.getNumberOfHosts()); - } - - @Override - public String toString() { -// MetricClusterAggregate - return "MetricAggregate{" + - "sum=" + sum + - ", numberOfHosts=" + numberOfHosts + - ", deviation=" + deviation + - ", max=" + max + - ", min=" + min + - '}'; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java deleted file mode 100644 index 02cc207..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; - - -import org.codehaus.jackson.annotate.JsonCreator; -import org.codehaus.jackson.annotate.JsonProperty; - -/** - * Represents a collection of minute based aggregation of values for - * resolution greater than a minute. - */ -public class MetricHostAggregate extends MetricAggregate { - - private long numberOfSamples = 0; - - @JsonCreator - public MetricHostAggregate() { - super(0.0, 0.0, Double.MIN_VALUE, Double.MAX_VALUE); - } - - public MetricHostAggregate(Double sum, int numberOfSamples, - Double deviation, - Double max, Double min) { - super(sum, deviation, max, min); - this.numberOfSamples = numberOfSamples; - } - - @JsonProperty("numberOfSamples") - long getNumberOfSamples() { - return numberOfSamples == 0 ? 1 : numberOfSamples; - } - - void updateNumberOfSamples(long count) { - this.numberOfSamples += count; - } - - public void setNumberOfSamples(long numberOfSamples) { - this.numberOfSamples = numberOfSamples; - } - - public double getAvg() { - return sum / numberOfSamples; - } - - /** - * Find and update min, max and avg for a minute - */ - void updateAggregates(MetricHostAggregate hostAggregate) { - updateMax(hostAggregate.getMax()); - updateMin(hostAggregate.getMin()); - updateSum(hostAggregate.getSum()); - updateNumberOfSamples(hostAggregate.getNumberOfSamples()); - } - - @Override - public String toString() { - return "MetricHostAggregate{" + - "sum=" + sum + - ", numberOfSamples=" + numberOfSamples + - ", deviation=" + deviation + - ", max=" + max + - ", min=" + min + - '}'; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java index 6a38517..2e78912 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java @@ -24,6 +24,17 @@ import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.hadoop.hbase.util.RetryCounterFactory; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricReadHelper; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.ConnectionProvider; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultPhoenixDataSource; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.SplitByMetricNamesCondition; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.phoenix.exception.SQLExceptionCode; import org.codehaus.jackson.map.ObjectMapper; @@ -41,25 +52,6 @@ import java.util.Map; import java.util.TreeMap; import java.util.concurrent.TimeUnit; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.ALTER_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.SplitByMetricNamesCondition; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DEFAULT_ENCODING; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_AGGREGATE_RECORD_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_TIME_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_METRICS_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_HOUR_TABLE_TTL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_MINUTE_TABLE_TTL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_MAX_RETRIES; @@ -70,6 +62,23 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_HOUR_TABLE_TTL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_MINUTE_TABLE_TTL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_TTL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.ALTER_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.DEFAULT_ENCODING; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_AGGREGATE_RECORD_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_TIME_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METRICS_SQL; /** * Provides a facade over the Phoenix API to access HBase schema @@ -77,7 +86,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti public class PhoenixHBaseAccessor { private static final Log LOG = LogFactory.getLog(PhoenixHBaseAccessor.class); - private static final TimelineMetricReader timelineMetricReader = new TimelineMetricReader(); + private static final TimelineMetricReadHelper TIMELINE_METRIC_READ_HELPER = new TimelineMetricReadHelper(); private final Configuration hbaseConf; private final Configuration metricsConf; private final RetryCounterFactory retryCounterFactory; @@ -151,14 +160,14 @@ public class PhoenixHBaseAccessor { private static TimelineMetric getLastTimelineMetricFromResultSet(ResultSet rs) throws SQLException, IOException { - TimelineMetric metric = timelineMetricReader + TimelineMetric metric = TIMELINE_METRIC_READ_HELPER .getTimelineMetricCommonsFromResultSet(rs); metric.setMetricValues(readLastMetricValueFromJSON(rs.getString("METRICS"))); return metric; } - static TimelineMetric getAggregatedTimelineMetricFromResultSet( + public static TimelineMetric getAggregatedTimelineMetricFromResultSet( ResultSet rs, Function f) throws SQLException, IOException { TimelineMetric metric = new TimelineMetric(); @@ -214,7 +223,7 @@ public class PhoenixHBaseAccessor { return (Map<Long, Double>) mapper.readValue(json, metricValuesTypeRef); } - static TimelineMetric getTimelineMetricKeyFromResultSet(ResultSet rs) + public static TimelineMetric getTimelineMetricKeyFromResultSet(ResultSet rs) throws SQLException, IOException { TimelineMetric metric = new TimelineMetric(); metric.setMetricName(rs.getString("METRIC_NAME")); @@ -226,7 +235,7 @@ public class PhoenixHBaseAccessor { return metric; } - static MetricHostAggregate getMetricHostAggregateFromResultSet(ResultSet rs) + public static MetricHostAggregate getMetricHostAggregateFromResultSet(ResultSet rs) throws SQLException { MetricHostAggregate metricHostAggregate = new MetricHostAggregate(); metricHostAggregate.setSum(rs.getDouble("METRIC_SUM")); @@ -238,7 +247,7 @@ public class PhoenixHBaseAccessor { return metricHostAggregate; } - static MetricClusterAggregate getMetricClusterAggregateFromResultSet(ResultSet rs) + public static MetricClusterAggregate getMetricClusterAggregateFromResultSet(ResultSet rs) throws SQLException { MetricClusterAggregate agg = new MetricClusterAggregate(); agg.setSum(rs.getDouble("METRIC_SUM")); @@ -474,7 +483,7 @@ public class PhoenixHBaseAccessor { } else { TimelineMetric metric; - metric = timelineMetricReader.getTimelineMetricFromResultSet(rs); + metric = TIMELINE_METRIC_READ_HELPER.getTimelineMetricFromResultSet(rs); if (condition.isGrouped()) { metrics.addOrMergeTimelineMetric(metric); http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java deleted file mode 100644 index 2cdefa9..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java +++ /dev/null @@ -1,970 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.util.Collections; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -/** - * Encapsulate all metrics related SQL queries. - */ -public class PhoenixTransactSQL { - - static final Log LOG = LogFactory.getLog(PhoenixTransactSQL.class); - /** - * Create table to store individual metric records. - */ - public static final String CREATE_METRICS_TABLE_SQL = "CREATE TABLE IF NOT " + - "EXISTS METRIC_RECORD (METRIC_NAME VARCHAR, " + - "HOSTNAME VARCHAR, " + - "SERVER_TIME UNSIGNED_LONG NOT NULL, " + - "APP_ID VARCHAR, " + - "INSTANCE_ID VARCHAR, " + - "START_TIME UNSIGNED_LONG, " + - "UNITS CHAR(20), " + - "METRIC_SUM DOUBLE, " + - "METRIC_COUNT UNSIGNED_INT, " + - "METRIC_MAX DOUBLE, " + - "METRIC_MIN DOUBLE, " + - "METRICS VARCHAR CONSTRAINT pk " + - "PRIMARY KEY (METRIC_NAME, HOSTNAME, SERVER_TIME, APP_ID, " + - "INSTANCE_ID)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " + - "TTL=%s, COMPRESSION='%s'"; - - public static final String CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL = - "CREATE TABLE IF NOT EXISTS METRIC_RECORD_HOURLY " + - "(METRIC_NAME VARCHAR, " + - "HOSTNAME VARCHAR, " + - "APP_ID VARCHAR, " + - "INSTANCE_ID VARCHAR, " + - "SERVER_TIME UNSIGNED_LONG NOT NULL, " + - "UNITS CHAR(20), " + - "METRIC_SUM DOUBLE," + - "METRIC_COUNT UNSIGNED_INT, " + - "METRIC_MAX DOUBLE," + - "METRIC_MIN DOUBLE CONSTRAINT pk " + - "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " + - "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " + - "TTL=%s, COMPRESSION='%s'"; - - public static final String CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL = - "CREATE TABLE IF NOT EXISTS METRIC_RECORD_MINUTE " + - "(METRIC_NAME VARCHAR, " + - "HOSTNAME VARCHAR, " + - "APP_ID VARCHAR, " + - "INSTANCE_ID VARCHAR, " + - "SERVER_TIME UNSIGNED_LONG NOT NULL, " + - "UNITS CHAR(20), " + - "METRIC_SUM DOUBLE," + - "METRIC_COUNT UNSIGNED_INT, " + - "METRIC_MAX DOUBLE," + - "METRIC_MIN DOUBLE CONSTRAINT pk " + - "PRIMARY KEY (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " + - "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, TTL=%s," + - " COMPRESSION='%s'"; - - public static final String CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL = - "CREATE TABLE IF NOT EXISTS METRIC_AGGREGATE " + - "(METRIC_NAME VARCHAR, " + - "APP_ID VARCHAR, " + - "INSTANCE_ID VARCHAR, " + - "SERVER_TIME UNSIGNED_LONG NOT NULL, " + - "UNITS CHAR(20), " + - "METRIC_SUM DOUBLE, " + - "HOSTS_COUNT UNSIGNED_INT, " + - "METRIC_MAX DOUBLE, " + - "METRIC_MIN DOUBLE " + - "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " + - "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " + - "TTL=%s, COMPRESSION='%s'"; - - public static final String CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL = - "CREATE TABLE IF NOT EXISTS METRIC_AGGREGATE_HOURLY " + - "(METRIC_NAME VARCHAR, " + - "APP_ID VARCHAR, " + - "INSTANCE_ID VARCHAR, " + - "SERVER_TIME UNSIGNED_LONG NOT NULL, " + - "UNITS CHAR(20), " + - "METRIC_SUM DOUBLE, " + - "METRIC_COUNT UNSIGNED_INT, " + - "METRIC_MAX DOUBLE, " + - "METRIC_MIN DOUBLE " + - "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID, INSTANCE_ID, " + - "SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " + - "TTL=%s, COMPRESSION='%s'"; - - /** - * ALTER table to set new options - */ - public static final String ALTER_SQL = "ALTER TABLE %s SET TTL=%s"; - - /** - * Insert into metric records table. - */ - public static final String UPSERT_METRICS_SQL = "UPSERT INTO %s " + - "(METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, START_TIME, " + - "UNITS, " + - "METRIC_SUM, " + - "METRIC_MAX, " + - "METRIC_MIN, " + - "METRIC_COUNT, " + - "METRICS) VALUES " + - "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; - - public static final String UPSERT_CLUSTER_AGGREGATE_SQL = "UPSERT INTO " + - "METRIC_AGGREGATE (METRIC_NAME, APP_ID, INSTANCE_ID, SERVER_TIME, " + - "UNITS, " + - "METRIC_SUM, " + - "HOSTS_COUNT, " + - "METRIC_MAX, " + - "METRIC_MIN) " + - "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; - - public static final String UPSERT_CLUSTER_AGGREGATE_TIME_SQL = "UPSERT INTO" + - " %s (METRIC_NAME, APP_ID, INSTANCE_ID, SERVER_TIME, " + - "UNITS, " + - "METRIC_SUM, " + - "METRIC_COUNT, " + - "METRIC_MAX, " + - "METRIC_MIN) " + - "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; - - - public static final String UPSERT_AGGREGATE_RECORD_SQL = "UPSERT INTO " + - "%s (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, " + - "SERVER_TIME, " + - "UNITS, " + - "METRIC_SUM, " + - "METRIC_MAX, " + - "METRIC_MIN," + - "METRIC_COUNT) " + - "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; - - /** - * Retrieve a set of rows from metrics records table. - */ - public static final String GET_METRIC_SQL = "SELECT %s METRIC_NAME, " + - "HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, START_TIME, UNITS, " + - "METRIC_SUM, " + - "METRIC_MAX, " + - "METRIC_MIN, " + - "METRIC_COUNT, " + - "METRICS " + - "FROM %s"; - - public static final String GET_METRIC_AGGREGATE_ONLY_SQL = "SELECT %s " + - "METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, " + - "UNITS, " + - "METRIC_SUM, " + - "METRIC_MAX, " + - "METRIC_MIN, " + - "METRIC_COUNT " + - "FROM %s"; - - public static final String GET_CLUSTER_AGGREGATE_SQL = "SELECT %s " + - "METRIC_NAME, APP_ID, " + - "INSTANCE_ID, SERVER_TIME, " + - "UNITS, " + - "METRIC_SUM, " + - "HOSTS_COUNT, " + - "METRIC_MAX, " + - "METRIC_MIN " + - "FROM %s"; - - public static final String GET_CLUSTER_AGGREGATE_HOURLY_SQL = "SELECT %s " + - "METRIC_NAME, APP_ID, " + - "INSTANCE_ID, SERVER_TIME, " + - "UNITS, " + - "METRIC_SUM, " + - "METRIC_COUNT, " + - "METRIC_MAX, " + - "METRIC_MIN " + - "FROM %s"; - - public static final String METRICS_RECORD_TABLE_NAME = "METRIC_RECORD"; - public static final String METRICS_AGGREGATE_MINUTE_TABLE_NAME = - "METRIC_RECORD_MINUTE"; - public static final String METRICS_AGGREGATE_HOURLY_TABLE_NAME = - "METRIC_RECORD_HOURLY"; - public static final String METRICS_CLUSTER_AGGREGATE_TABLE_NAME = - "METRIC_AGGREGATE"; - public static final String METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME = - "METRIC_AGGREGATE_HOURLY"; - public static final String DEFAULT_TABLE_COMPRESSION = "SNAPPY"; - public static final String DEFAULT_ENCODING = "FAST_DIFF"; - public static final long NATIVE_TIME_RANGE_DELTA = 120000; // 2 minutes - public static final long HOUR = 3600000; // 1 hour - public static final long DAY = 86400000; // 1 day - - /** - * Filter to optimize HBase scan by using file timestamps. This prevents - * a full table scan of metric records. - * - * @return Phoenix Hint String - */ - public static String getNaiveTimeRangeHint(Long startTime, Long delta) { - return String.format("/*+ NATIVE_TIME_RANGE(%s) */", (startTime - delta)); - } - - public static PreparedStatement prepareGetMetricsSqlStmt( - Connection connection, Condition condition) throws SQLException { - - validateConditionIsNotEmpty(condition); - validateRowCountLimit(condition); - - String stmtStr; - if (condition.getStatement() != null) { - stmtStr = condition.getStatement(); - } else { - - String metricsTable; - String query; - if (condition.getPrecision() == null) { - long endTime = condition.getEndTime() == null ? System.currentTimeMillis() : condition.getEndTime(); - long startTime = condition.getStartTime() == null ? 0 : condition.getStartTime(); - Long timeRange = endTime - startTime; - if (timeRange > 5 * DAY) { - metricsTable = METRICS_AGGREGATE_HOURLY_TABLE_NAME; - query = GET_METRIC_AGGREGATE_ONLY_SQL; - condition.setPrecision(Precision.HOURS); - } else if (timeRange > 10 * HOUR) { - metricsTable = METRICS_AGGREGATE_MINUTE_TABLE_NAME; - query = GET_METRIC_AGGREGATE_ONLY_SQL; - condition.setPrecision(Precision.MINUTES); - } else { - metricsTable = METRICS_RECORD_TABLE_NAME; - query = GET_METRIC_SQL; - condition.setPrecision(Precision.SECONDS); - } - } else { - switch (condition.getPrecision()) { - case HOURS: - metricsTable = METRICS_AGGREGATE_HOURLY_TABLE_NAME; - query = GET_METRIC_AGGREGATE_ONLY_SQL; - break; - case MINUTES: - metricsTable = METRICS_AGGREGATE_MINUTE_TABLE_NAME; - query = GET_METRIC_AGGREGATE_ONLY_SQL; - break; - default: - metricsTable = METRICS_RECORD_TABLE_NAME; - query = GET_METRIC_SQL; - } - } - - stmtStr = String.format(query, - getNaiveTimeRangeHint(condition.getStartTime(), NATIVE_TIME_RANGE_DELTA), - metricsTable); - } - - StringBuilder sb = new StringBuilder(stmtStr); - sb.append(" WHERE "); - sb.append(condition.getConditionClause()); - String orderByClause = condition.getOrderByClause(true); - - if (orderByClause != null) { - sb.append(orderByClause); - } else { - sb.append(" ORDER BY METRIC_NAME, SERVER_TIME "); - } - if (condition.getLimit() != null) { - sb.append(" LIMIT ").append(condition.getLimit()); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("SQL: " + sb.toString() + ", condition: " + condition); - } - PreparedStatement stmt = connection.prepareStatement(sb.toString()); - int pos = 1; - if (condition.getMetricNames() != null) { - for (; pos <= condition.getMetricNames().size(); pos++) { - if (LOG.isDebugEnabled()) { - LOG.debug("Setting pos: " + pos + ", value = " + condition.getMetricNames().get(pos - 1)); - } - stmt.setString(pos, condition.getMetricNames().get(pos - 1)); - } - } - if (condition.getHostname() != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Setting pos: " + pos + ", value: " + condition.getHostname()); - } - stmt.setString(pos++, condition.getHostname()); - } - if (condition.getAppId() != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Setting pos: " + pos + ", value: " + condition.getAppId()); - } - stmt.setString(pos++, condition.getAppId()); - } - if (condition.getInstanceId() != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Setting pos: " + pos + ", value: " + condition.getInstanceId()); - } - stmt.setString(pos++, condition.getInstanceId()); - } - if (condition.getStartTime() != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Setting pos: " + pos + ", value: " + condition.getStartTime()); - } - stmt.setLong(pos++, condition.getStartTime()); - } - if (condition.getEndTime() != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Setting pos: " + pos + ", value: " + condition.getEndTime()); - } - stmt.setLong(pos, condition.getEndTime()); - } - if (condition.getFetchSize() != null) { - stmt.setFetchSize(condition.getFetchSize()); - } - - return stmt; - } - - private static void validateConditionIsNotEmpty(Condition condition) { - if (condition.isEmpty()) { - throw new IllegalArgumentException("Condition is empty."); - } - } - - private static void validateRowCountLimit(Condition condition) { - if (condition.getMetricNames() == null - || condition.getMetricNames().size() ==0 ) { - //aggregator can use empty metrics query - return; - } - - long range = condition.getEndTime() - condition.getStartTime(); - long rowsPerMetric = TimeUnit.MILLISECONDS.toHours(range) + 1; - - Precision precision = condition.getPrecision(); - // for minutes and seconds we can use the rowsPerMetric computed based on - // minutes - if (precision != null && precision == Precision.HOURS) { - rowsPerMetric = TimeUnit.MILLISECONDS.toHours(range) + 1; - } - - long totalRowsRequested = rowsPerMetric * condition.getMetricNames().size(); - if (totalRowsRequested > PhoenixHBaseAccessor.RESULTSET_LIMIT) { - throw new IllegalArgumentException("The time range query for " + - "precision table exceeds row count limit, please query aggregate " + - "table instead."); - } - } - - public static PreparedStatement prepareGetLatestMetricSqlStmt( - Connection connection, Condition condition) throws SQLException { - - validateConditionIsNotEmpty(condition); - - if (condition.getMetricNames() == null - || condition.getMetricNames().size() == 0) { - throw new IllegalArgumentException("Point in time query without " + - "metric names not supported "); - } - - String stmtStr; - if (condition.getStatement() != null) { - stmtStr = condition.getStatement(); - } else { - stmtStr = String.format(GET_METRIC_SQL, - "", - METRICS_RECORD_TABLE_NAME); - } - - StringBuilder sb = new StringBuilder(stmtStr); - sb.append(" WHERE "); - sb.append(condition.getConditionClause()); - String orderByClause = condition.getOrderByClause(false); - if (orderByClause != null) { - sb.append(orderByClause); - } else { - sb.append(" ORDER BY METRIC_NAME DESC, HOSTNAME DESC, SERVER_TIME DESC "); - } - - sb.append(" LIMIT ").append(condition.getMetricNames().size()); - - if (LOG.isDebugEnabled()) { - LOG.debug("SQL: " + sb.toString() + ", condition: " + condition); - } - PreparedStatement stmt = connection.prepareStatement(sb.toString()); - int pos = 1; - if (condition.getMetricNames() != null) { - //IGNORE condition limit, set one based on number of metric names - for (; pos <= condition.getMetricNames().size(); pos++) { - if (LOG.isDebugEnabled()) { - LOG.debug("Setting pos: " + pos + ", value = " + condition.getMetricNames().get(pos - 1)); - } - stmt.setString(pos, condition.getMetricNames().get(pos - 1)); - } - } - if (condition.getHostname() != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Setting pos: " + pos + ", value: " + condition.getHostname()); - } - stmt.setString(pos++, condition.getHostname()); - } - if (condition.getAppId() != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Setting pos: " + pos + ", value: " + condition.getAppId()); - } - stmt.setString(pos++, condition.getAppId()); - } - if (condition.getInstanceId() != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Setting pos: " + pos + ", value: " + condition.getInstanceId()); - } - stmt.setString(pos++, condition.getInstanceId()); - } - - if (condition.getFetchSize() != null) { - stmt.setFetchSize(condition.getFetchSize()); - } - - return stmt; - } - - public static PreparedStatement prepareGetAggregateSqlStmt( - Connection connection, Condition condition) throws SQLException { - - validateConditionIsNotEmpty(condition); - - String metricsAggregateTable; - String queryStmt; - if (condition.getPrecision() == null) { - long endTime = condition.getEndTime() == null ? System.currentTimeMillis() : condition.getEndTime(); - long startTime = condition.getStartTime() == null ? 0 : condition.getStartTime(); - Long timeRange = endTime - startTime; - if (timeRange > 5 * DAY) { - metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME; - queryStmt = GET_CLUSTER_AGGREGATE_HOURLY_SQL; - condition.setPrecision(Precision.HOURS); - } else { - metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_TABLE_NAME; - queryStmt = GET_CLUSTER_AGGREGATE_SQL; - condition.setPrecision(Precision.SECONDS); - } - } else { - switch (condition.getPrecision()) { - case HOURS: - metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME; - queryStmt = GET_CLUSTER_AGGREGATE_HOURLY_SQL; - break; - default: - metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_TABLE_NAME; - queryStmt = GET_CLUSTER_AGGREGATE_SQL; - } - } - - StringBuilder sb = new StringBuilder(queryStmt); - sb.append(" WHERE "); - sb.append(condition.getConditionClause()); - sb.append(" ORDER BY METRIC_NAME, SERVER_TIME"); - if (condition.getLimit() != null) { - sb.append(" LIMIT ").append(condition.getLimit()); - } - - String query = String.format(sb.toString(), - PhoenixTransactSQL.getNaiveTimeRangeHint(condition.getStartTime(), - NATIVE_TIME_RANGE_DELTA), metricsAggregateTable); - if (LOG.isDebugEnabled()) { - LOG.debug("SQL => " + query + ", condition => " + condition); - } - PreparedStatement stmt = connection.prepareStatement(query); - int pos = 1; - if (condition.getMetricNames() != null) { - for (; pos <= condition.getMetricNames().size(); pos++) { - stmt.setString(pos, condition.getMetricNames().get(pos - 1)); - } - } - // TODO: Upper case all strings on POST - if (condition.getAppId() != null) { - stmt.setString(pos++, condition.getAppId()); - } - if (condition.getInstanceId() != null) { - stmt.setString(pos++, condition.getInstanceId()); - } - if (condition.getStartTime() != null) { - stmt.setLong(pos++, condition.getStartTime()); - } - if (condition.getEndTime() != null) { - stmt.setLong(pos, condition.getEndTime()); - } - - return stmt; - } - - public static PreparedStatement prepareGetLatestAggregateMetricSqlStmt( - Connection connection, Condition condition) throws SQLException { - - validateConditionIsNotEmpty(condition); - - String stmtStr; - if (condition.getStatement() != null) { - stmtStr = condition.getStatement(); - } else { - stmtStr = String.format(GET_CLUSTER_AGGREGATE_SQL, "", - METRICS_CLUSTER_AGGREGATE_TABLE_NAME); - } - - StringBuilder sb = new StringBuilder(stmtStr); - sb.append(" WHERE "); - sb.append(condition.getConditionClause()); - String orderByClause = condition.getOrderByClause(false); - if (orderByClause != null) { - sb.append(orderByClause); - } else { - sb.append(" ORDER BY METRIC_NAME DESC, SERVER_TIME DESC "); - } - - sb.append(" LIMIT ").append(condition.getMetricNames().size()); - - String query = sb.toString(); - if (LOG.isDebugEnabled()) { - LOG.debug("SQL: " + query + ", condition: " + condition); - } - - PreparedStatement stmt = connection.prepareStatement(query); - int pos = 1; - if (condition.getMetricNames() != null) { - for (; pos <= condition.getMetricNames().size(); pos++) { - stmt.setString(pos, condition.getMetricNames().get(pos - 1)); - } - } - if (condition.getAppId() != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Setting pos: " + pos + ", value: " + condition.getAppId()); - } - stmt.setString(pos++, condition.getAppId()); - } - if (condition.getInstanceId() != null) { - stmt.setString(pos++, condition.getInstanceId()); - } - - return stmt; - } - - static interface Condition { - - boolean isEmpty(); - - List<String> getMetricNames(); - boolean isPointInTime(); - boolean isGrouped(); - void setStatement(String statement); - String getHostname(); - Precision getPrecision(); - void setPrecision(Precision precision); - String getAppId(); - String getInstanceId(); - StringBuilder getConditionClause(); - String getOrderByClause(boolean asc); - String getStatement(); - Long getStartTime(); - Long getEndTime(); - Integer getLimit(); - Integer getFetchSize(); - void setFetchSize(Integer fetchSize); - void addOrderByColumn(String column); - void setNoLimit(); - } - - static class DefaultCondition implements Condition { - List<String> metricNames; - String hostname; - String appId; - String instanceId; - Long startTime; - Long endTime; - Precision precision; - Integer limit; - boolean grouped; - boolean noLimit = false; - Integer fetchSize; - String statement; - Set<String> orderByColumns = new LinkedHashSet<String>(); - - DefaultCondition(List<String> metricNames, String hostname, String appId, - String instanceId, Long startTime, Long endTime, Precision precision, - Integer limit, boolean grouped) { - this.metricNames = metricNames; - this.hostname = hostname; - this.appId = appId; - this.instanceId = instanceId; - this.startTime = startTime; - this.endTime = endTime; - this.precision = precision; - this.limit = limit; - this.grouped = grouped; - } - - public String getStatement() { - return statement; - } - - public void setStatement(String statement) { - this.statement = statement; - } - - public List<String> getMetricNames() { - return metricNames == null || metricNames.isEmpty() ? null : metricNames; - } - - public StringBuilder getConditionClause() { - StringBuilder sb = new StringBuilder(); - boolean appendConjunction = false; - StringBuilder metricsLike = new StringBuilder(); - StringBuilder metricsIn = new StringBuilder(); - - if (getMetricNames() != null) { - for (String name : getMetricNames()) { - if (name.contains("%")) { - if (metricsLike.length() > 1) { - metricsLike.append(" OR "); - } - metricsLike.append("METRIC_NAME LIKE ?"); - } else { - if (metricsIn.length() > 0) { - metricsIn.append(", "); - } - metricsIn.append("?"); - } - } - - if (metricsIn.length()>0) { - sb.append("(METRIC_NAME IN ("); - sb.append(metricsIn); - sb.append(")"); - appendConjunction = true; - } - - if (metricsLike.length() > 0) { - if (appendConjunction) { - sb.append(" OR "); - } else { - sb.append("("); - } - sb.append(metricsLike); - appendConjunction = true; - } - - if (appendConjunction) { - sb.append(")"); - } - } - - appendConjunction = append(sb, appendConjunction, getHostname(), " HOSTNAME = ?"); - appendConjunction = append(sb, appendConjunction, getAppId(), " APP_ID = ?"); - appendConjunction = append(sb, appendConjunction, getInstanceId(), " INSTANCE_ID = ?"); - appendConjunction = append(sb, appendConjunction, getStartTime(), " SERVER_TIME >= ?"); - append(sb, appendConjunction, getEndTime(), " SERVER_TIME < ?"); - - return sb; - } - - protected static boolean append(StringBuilder sb, - boolean appendConjunction, - Object value, String str) { - if (value != null) { - if (appendConjunction) { - sb.append(" AND"); - } - - sb.append(str); - appendConjunction = true; - } - return appendConjunction; - } - - public String getHostname() { - return hostname == null || hostname.isEmpty() ? null : hostname; - } - - public Precision getPrecision() { - return precision; - } - - public void setPrecision(Precision precision) { - this.precision = precision; - } - - public String getAppId() { - if (appId != null && !appId.isEmpty()) { - if (!(appId.equals("HOST") || appId.equals("FLUME_HANDLER")) ) { - return appId.toLowerCase(); - } else { - return appId; - } - } - return null; - } - - public String getInstanceId() { - return instanceId == null || instanceId.isEmpty() ? null : instanceId; - } - - /** - * Convert to millis. - */ - public Long getStartTime() { - if (startTime == null) { - return null; - } else if (startTime < 9999999999l) { - return startTime * 1000; - } else { - return startTime; - } - } - - public Long getEndTime() { - if (endTime == null) { - return null; - } - if (endTime < 9999999999l) { - return endTime * 1000; - } else { - return endTime; - } - } - - public void setNoLimit() { - this.noLimit = true; - } - - public Integer getLimit() { - if (noLimit) { - return null; - } - return limit == null ? PhoenixHBaseAccessor.RESULTSET_LIMIT : limit; - } - - public boolean isGrouped() { - return grouped; - } - - public boolean isPointInTime() { - return getStartTime() == null && getEndTime() == null; - } - - public boolean isEmpty() { - return (metricNames == null || metricNames.isEmpty()) - && (hostname == null || hostname.isEmpty()) - && (appId == null || appId.isEmpty()) - && (instanceId == null || instanceId.isEmpty()) - && startTime == null - && endTime == null; - } - - public Integer getFetchSize() { - return fetchSize; - } - - public void setFetchSize(Integer fetchSize) { - this.fetchSize = fetchSize; - } - - public void addOrderByColumn(String column) { - orderByColumns.add(column); - } - - public String getOrderByClause(boolean asc) { - String orderByStr = " ORDER BY "; - if (!orderByColumns.isEmpty()) { - StringBuilder sb = new StringBuilder(orderByStr); - for (String orderByColumn : orderByColumns) { - if (sb.length() != orderByStr.length()) { - sb.append(", "); - } - sb.append(orderByColumn); - if (!asc) { - sb.append(" DESC"); - } - } - sb.append(" "); - return sb.toString(); - } - return null; - } - - @Override - public String toString() { - return "Condition{" + - "metricNames=" + metricNames + - ", hostname='" + hostname + '\'' + - ", appId='" + appId + '\'' + - ", instanceId='" + instanceId + '\'' + - ", startTime=" + startTime + - ", endTime=" + endTime + - ", limit=" + limit + - ", grouped=" + grouped + - ", orderBy=" + orderByColumns + - ", noLimit=" + noLimit + - '}'; - } - } - - static class SplitByMetricNamesCondition implements Condition { - private final Condition adaptee; - private String currentMetric; - - SplitByMetricNamesCondition(Condition condition){ - this.adaptee = condition; - } - - @Override - public boolean isEmpty() { - return adaptee.isEmpty(); - } - - @Override - public List<String> getMetricNames() { - return Collections.singletonList(currentMetric); - } - - @Override - public boolean isPointInTime() { - return adaptee.isPointInTime(); - } - - @Override - public boolean isGrouped() { - return adaptee.isGrouped(); - } - - @Override - public void setStatement(String statement) { - adaptee.setStatement(statement); - } - - @Override - public String getHostname() { - return adaptee.getHostname(); - } - - @Override - public Precision getPrecision() { - return adaptee.getPrecision(); - } - - @Override - public void setPrecision(Precision precision) { - adaptee.setPrecision(precision); - } - - @Override - public String getAppId() { - return adaptee.getAppId(); - } - - @Override - public String getInstanceId() { - return adaptee.getInstanceId(); - } - - @Override - public StringBuilder getConditionClause() { - StringBuilder sb = new StringBuilder(); - boolean appendConjunction = false; - - if (getMetricNames() != null) { - for (String name : getMetricNames()) { - if (sb.length() > 1) { - sb.append(" OR "); - } - sb.append("METRIC_NAME = ?"); - } - - appendConjunction = true; - } - - appendConjunction = DefaultCondition.append(sb, appendConjunction, - getHostname(), " HOSTNAME = ?"); - appendConjunction = DefaultCondition.append(sb, appendConjunction, - getAppId(), " APP_ID = ?"); - appendConjunction = DefaultCondition.append(sb, appendConjunction, - getInstanceId(), " INSTANCE_ID = ?"); - appendConjunction = DefaultCondition.append(sb, appendConjunction, - getStartTime(), " SERVER_TIME >= ?"); - DefaultCondition.append(sb, appendConjunction, getEndTime(), - " SERVER_TIME < ?"); - - return sb; - } - - @Override - public String getOrderByClause(boolean asc) { - return adaptee.getOrderByClause(asc); - } - - @Override - public String getStatement() { - return adaptee.getStatement(); - } - - @Override - public Long getStartTime() { - return adaptee.getStartTime(); - } - - @Override - public Long getEndTime() { - return adaptee.getEndTime(); - } - - @Override - public Integer getLimit() { - return adaptee.getLimit(); - } - - @Override - public Integer getFetchSize() { - return adaptee.getFetchSize(); - } - - @Override - public void setFetchSize(Integer fetchSize) { - adaptee.setFetchSize(fetchSize); - } - - @Override - public void addOrderByColumn(String column) { - adaptee.addOrderByColumn(column); - } - - @Override - public void setNoLimit() { - adaptee.setNoLimit(); - } - - public List<String> getOriginalMetricNames() { - return adaptee.getMetricNames(); - } - - public void setCurrentMetric(String currentMetric) { - this.currentMetric = currentMetric; - } - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/b93452ed/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java deleted file mode 100644 index d227993..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java +++ /dev/null @@ -1,97 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; - -public class TimelineClusterMetric { - private String metricName; - private String appId; - private String instanceId; - private long timestamp; - private String type; - - TimelineClusterMetric(String metricName, String appId, String instanceId, - long timestamp, String type) { - this.metricName = metricName; - this.appId = appId; - this.instanceId = instanceId; - this.timestamp = timestamp; - this.type = type; - } - - String getMetricName() { - return metricName; - } - - String getAppId() { - return appId; - } - - String getInstanceId() { - return instanceId; - } - - long getTimestamp() { - return timestamp; - } - - String getType() { return type; } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - TimelineClusterMetric that = (TimelineClusterMetric) o; - - if (timestamp != that.timestamp) return false; - if (appId != null ? !appId.equals(that.appId) : that.appId != null) - return false; - if (instanceId != null ? !instanceId.equals(that.instanceId) : that.instanceId != null) - return false; - if (!metricName.equals(that.metricName)) return false; - - return true; - } - - public boolean equalsExceptTime(TimelineClusterMetric metric) { - if (!metricName.equals(metric.metricName)) return false; - if (!appId.equals(metric.appId)) return false; - if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null) - return false; - - return true; - } - @Override - public int hashCode() { - int result = metricName.hashCode(); - result = 31 * result + (appId != null ? appId.hashCode() : 0); - result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0); - result = 31 * result + (int) (timestamp ^ (timestamp >>> 32)); - return result; - } - - @Override - public String toString() { - return "TimelineClusterMetric{" + - "metricName='" + metricName + '\'' + - ", appId='" + appId + '\'' + - ", instanceId='" + instanceId + '\'' + - ", timestamp=" + timestamp + - '}'; - } -}