Repository: ambari
Updated Branches:
  refs/heads/trunk da33e4371 -> 347dc63e0


AMBARI-16412 : Support TopN queries in AMS (avijayan)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/47c7b5ef
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/47c7b5ef
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/47c7b5ef

Branch: refs/heads/trunk
Commit: 47c7b5ef6d17ca199a1b3b5799e7c66fe9ecc606
Parents: da33e43
Author: Aravindan Vijayan <avija...@hortonworks.com>
Authored: Tue May 10 10:33:18 2016 -0700
Committer: Aravindan Vijayan <avija...@hortonworks.com>
Committed: Tue May 10 14:32:02 2016 -0700

----------------------------------------------------------------------
 .../metrics2/sink/timeline/TopNConfig.java      |  70 +++++++
 .../timeline/HBaseTimelineMetricStore.java      |  89 ++++----
 .../metrics/timeline/PhoenixHBaseAccessor.java  |  12 +-
 .../timeline/TimelineMetricConfiguration.java   |   3 +
 .../metrics/timeline/TimelineMetricStore.java   |  19 +-
 .../timeline/TimelineMetricStoreWatcher.java    |   7 +-
 .../timeline/query/ConditionBuilder.java        | 137 +++++++++++++
 .../timeline/query/DefaultCondition.java        | 126 +++++++-----
 .../timeline/query/PhoenixTransactSQL.java      | 203 ++++++++++++++-----
 .../metrics/timeline/query/TopNCondition.java   | 192 ++++++++++++++++++
 .../webapp/TimelineWebServices.java             |  70 ++-----
 .../timeline/TestPhoenixTransactSQL.java        |  58 ++++++
 .../timeline/TestTimelineMetricStore.java       |  11 +-
 .../TimelineMetricStoreWatcherTest.java         |  15 +-
 14 files changed, 767 insertions(+), 245 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/47c7b5ef/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TopNConfig.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TopNConfig.java
 
b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TopNConfig.java
new file mode 100644
index 0000000..61127da
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TopNConfig.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement(name = "topnconfig")
+@XmlAccessorType(XmlAccessType.NONE)
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class TopNConfig {
+  Integer topN;
+  String topNFunction;
+  Boolean isBottomN;
+
+  public TopNConfig(Integer topN, String topNFunction, Boolean isBottomN) {
+    this.setTopN(topN);
+    this.setTopNFunction(topNFunction);
+    this.setIsBottomN(isBottomN);
+  }
+
+  @XmlElement(name = "topn")
+  public Integer getTopN() {
+    return topN;
+  }
+
+  public void setTopN(Integer topN) {
+    this.topN = topN;
+  }
+
+  @XmlElement(name = "topnfunction")
+  public String getTopNFunction() {
+    return topNFunction;
+  }
+
+  public void setTopNFunction(String topNFunction) {
+    this.topNFunction = topNFunction;
+  }
+
+  @XmlElement(name = "isbottomn")
+  public Boolean getIsBottomN() {
+    return isBottomN;
+  }
+
+  public void setIsBottomN(Boolean isBottomN) {
+    this.isBottomN = isBottomN;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/47c7b5ef/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
index 89c67d1..974f951 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.metrics2.sink.timeline.Precision;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.metrics2.sink.timeline.TopNConfig;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
@@ -36,7 +37,8 @@ import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.ConditionBuilder;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.TopNCondition;
 
 import java.io.IOException;
 import java.sql.SQLException;
@@ -54,6 +56,7 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_TOPN_HOSTS_LIMIT;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES;
 
 public class HBaseTimelineMetricStore extends AbstractService implements 
TimelineMetricStore {
@@ -66,6 +69,7 @@ public class HBaseTimelineMetricStore extends AbstractService 
implements Timelin
   private final Map<AGGREGATOR_NAME, ScheduledExecutorService> 
scheduledExecutors = new HashMap<>();
   private TimelineMetricMetadataManager metricMetadataManager;
   private TimelineMetricHAController haController;
+  private Integer defaultTopNHostsLimit;
 
   /**
    * Construct the service.
@@ -106,6 +110,7 @@ public class HBaseTimelineMetricStore extends 
AbstractService implements Timelin
         }
       }
 
+      defaultTopNHostsLimit = 
Integer.parseInt(metricsConf.get(DEFAULT_TOPN_HOSTS_LIMIT, "20"));
       if (Boolean.parseBoolean(metricsConf.get(USE_GROUPBY_AGGREGATOR_QUERIES, 
"true"))) {
         LOG.info("Using group by aggregators for aggregating host and cluster 
metrics.");
       }
@@ -177,7 +182,7 @@ public class HBaseTimelineMetricStore extends 
AbstractService implements Timelin
   public TimelineMetrics getTimelineMetrics(List<String> metricNames,
       List<String> hostnames, String applicationId, String instanceId,
       Long startTime, Long endTime, Precision precision, Integer limit,
-      boolean groupedByHosts) throws SQLException, IOException {
+      boolean groupedByHosts, TopNConfig topNConfig) throws SQLException, 
IOException {
 
     if (metricNames == null || metricNames.isEmpty()) {
       throw new IllegalArgumentException("No metric name filter specified.");
@@ -192,10 +197,34 @@ public class HBaseTimelineMetricStore extends 
AbstractService implements Timelin
     Map<String, List<Function>> metricFunctions =
       parseMetricNamesToAggregationFunctions(metricNames);
 
-    Condition condition = new DefaultCondition(
-      new ArrayList<String>(metricFunctions.keySet()),
-      hostnames, applicationId, instanceId, startTime, endTime,
-      precision, limit, groupedByHosts);
+    ConditionBuilder conditionBuilder = new ConditionBuilder(new 
ArrayList<String>(metricFunctions.keySet()))
+      .hostnames(hostnames)
+      .appId(applicationId)
+      .instanceId(instanceId)
+      .startTime(startTime)
+      .endTime(endTime)
+      .precision(precision)
+      .limit(limit)
+      .grouped(groupedByHosts);
+
+    if (topNConfig != null) {
+      if (TopNCondition.isTopNHostCondition(metricNames, hostnames) || 
TopNCondition.isTopNMetricCondition(metricNames, hostnames)) {
+        conditionBuilder.topN(topNConfig.getTopN());
+        conditionBuilder.isBottomN(topNConfig.getIsBottomN());
+        Function.ReadFunction readFunction = 
Function.ReadFunction.getFunction(topNConfig.getTopNFunction());
+        Function function = new Function(readFunction, null);
+        conditionBuilder.topNFunction(function);
+      } else {
+        LOG.info("Invalid Input for TopN query. Ignoring TopN Request.");
+      }
+    } else if (hostnames != null && hostnames.size() > defaultTopNHostsLimit) {
+      LOG.info("Requesting data for more than " + defaultTopNHostsLimit +  " 
Hosts. " +
+        "Defaulting to Top " + defaultTopNHostsLimit);
+      conditionBuilder.topN(defaultTopNHostsLimit);
+      conditionBuilder.isBottomN(false);
+    }
+
+    Condition condition = conditionBuilder.build();
 
     TimelineMetrics metrics;
 
@@ -279,54 +308,6 @@ public class HBaseTimelineMetricStore extends 
AbstractService implements Timelin
   }
 
   @Override
-  public TimelineMetric getTimelineMetric(String metricName, List<String> 
hostnames,
-      String applicationId, String instanceId, Long startTime,
-      Long endTime, Precision precision, Integer limit)
-      throws SQLException, IOException {
-
-    if (metricName == null || metricName.isEmpty()) {
-      throw new IllegalArgumentException("No metric name filter specified.");
-    }
-    if ((startTime == null && endTime != null)
-        || (startTime != null && endTime == null)) {
-      throw new IllegalArgumentException("Open ended query not supported ");
-    }
-    if (limit !=null && limit > PhoenixHBaseAccessor.RESULTSET_LIMIT){
-      throw new IllegalArgumentException("Limit too big");
-    }
-
-    Map<String, List<Function>> metricFunctions =
-      
parseMetricNamesToAggregationFunctions(Collections.singletonList(metricName));
-
-    Condition condition = new DefaultCondition(
-      new ArrayList<String>(metricFunctions.keySet()), hostnames, 
applicationId,
-      instanceId, startTime, endTime, precision, limit, true);
-    TimelineMetrics metrics = hBaseAccessor.getMetricRecords(condition,
-      metricFunctions);
-
-    metrics = postProcessMetrics(metrics);
-
-    TimelineMetric metric = new TimelineMetric();
-    List<TimelineMetric> metricList = metrics.getMetrics();
-
-    if (metricList != null && !metricList.isEmpty()) {
-      metric.setMetricName(metricList.get(0).getMetricName());
-      metric.setAppId(metricList.get(0).getAppId());
-      metric.setInstanceId(metricList.get(0).getInstanceId());
-      metric.setHostName(metricList.get(0).getHostName());
-      // Assumption that metrics are ordered by start time
-      metric.setStartTime(metricList.get(0).getStartTime());
-      TreeMap<Long, Double> metricRecords = new TreeMap<Long, Double>();
-      for (TimelineMetric timelineMetric : metricList) {
-        metricRecords.putAll(timelineMetric.getMetricValues());
-      }
-      metric.setMetricValues(metricRecords);
-    }
-
-    return metric;
-  }
-
-  @Override
   public TimelinePutResponse putMetrics(TimelineMetrics metrics) throws 
SQLException, IOException {
     // Error indicated by the Sql exception
     TimelinePutResponse response = new TimelinePutResponse();

http://git-wip-us.apache.org/repos/asf/ambari/blob/47c7b5ef/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index a0ecbcc..52ab083 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -604,12 +604,14 @@ public class PhoenixHBaseAccessor {
         try {
           metricRecordStmt.executeUpdate();
 
-          // Write to metadata cache on successful write to store
-          metadataManager.putIfModifiedTimelineMetricMetadata(
-            metadataManager.getTimelineMetricMetadata(metric));
+          if (metadataManager != null) {
+            // Write to metadata cache on successful write to store
+            metadataManager.putIfModifiedTimelineMetricMetadata(
+              metadataManager.getTimelineMetricMetadata(metric));
 
-          metadataManager.putIfModifiedHostedAppsMetadata(
-            metric.getHostName(), metric.getAppId());
+            metadataManager.putIfModifiedHostedAppsMetadata(
+              metric.getHostName(), metric.getAppId());
+          }
 
         } catch (SQLException sql) {
           LOG.error("Failed on insert records to store.", sql);

http://git-wip-us.apache.org/repos/asf/ambari/blob/47c7b5ef/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
index cf30e24..683e5d4 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
@@ -234,6 +234,9 @@ public class TimelineMetricConfiguration {
   public static final String HBASE_BLOCKING_STORE_FILES =
     "hbase.hstore.blockingStoreFiles";
 
+  public static final String DEFAULT_TOPN_HOSTS_LIMIT =
+    "timeline.metrics.default.topn.hosts.limit";
+
   public static final String HOST_APP_ID = "HOST";
 
   public static final String DEFAULT_INSTANCE_PORT = "12001";

http://git-wip-us.apache.org/repos/asf/ambari/blob/47c7b5ef/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
index ded64e3..e37bc4d 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.metrics2.sink.timeline.Precision;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.metrics2.sink.timeline.TopNConfig;
 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
 import java.io.IOException;
 import java.sql.SQLException;
@@ -48,21 +49,11 @@ public interface TimelineMetricStore {
    * @throws java.sql.SQLException
    */
   TimelineMetrics getTimelineMetrics(List<String> metricNames, List<String> 
hostnames,
-      String applicationId, String instanceId, Long startTime,
-      Long endTime, Precision precision, Integer limit, boolean groupedByHosts)
+                                     String applicationId, String instanceId, 
Long startTime,
+                                     Long endTime, Precision precision, 
Integer limit, boolean groupedByHosts,
+                                     TopNConfig topNConfig)
     throws SQLException, IOException;
 
-
-  /**
-   * Return all records for a single metric satisfying the filter criteria.
-   * @return {@link TimelineMetric}
-   */
-  TimelineMetric getTimelineMetric(String metricName, List<String> hostname,
-      String applicationId, String instanceId, Long startTime,
-      Long endTime, Precision precision, Integer limit)
-      throws SQLException, IOException;
-
-
   /**
    * Stores metric information to the timeline store. Any errors occurring for
    * individual put request objects will be reported in the response.
@@ -74,7 +65,7 @@ public interface TimelineMetricStore {
   TimelinePutResponse putMetrics(TimelineMetrics metrics) throws SQLException, 
IOException;
 
   /**
-   * Store container metric into the timeliens tore
+   * Store container metric into the timeline tore
    */
   TimelinePutResponse putContainerMetrics(List<ContainerMetric> metrics)
       throws SQLException, IOException;

http://git-wip-us.apache.org/repos/asf/ambari/blob/47c7b5ef/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStoreWatcher.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStoreWatcher.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStoreWatcher.java
index 632df3f..7d49070 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStoreWatcher.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStoreWatcher.java
@@ -102,10 +102,11 @@ public class TimelineMetricStoreWatcher implements 
Runnable {
     Callable<TimelineMetric> task = new Callable<TimelineMetric>() {
       public TimelineMetric call() throws Exception {
         timelineMetricStore.putMetrics(metrics);
-        return timelineMetricStore.getTimelineMetric(
-          FAKE_METRIC_NAME, Collections.singletonList(FAKE_HOSTNAME),
+        TimelineMetrics timelineMetrics = 
timelineMetricStore.getTimelineMetrics(
+          Collections.singletonList(FAKE_METRIC_NAME), 
Collections.singletonList(FAKE_HOSTNAME),
           FAKE_APP_ID, null, startTime - delay * 2 * 1000,
-          startTime + delay * 2 * 1000, Precision.SECONDS, 1);
+          startTime + delay * 2 * 1000, Precision.SECONDS, 1, true, null);
+        return timelineMetrics.getMetrics().get(0);
       }
     };
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/47c7b5ef/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConditionBuilder.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConditionBuilder.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConditionBuilder.java
new file mode 100644
index 0000000..32c1e84
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConditionBuilder.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query;
+
+import org.apache.hadoop.metrics2.sink.timeline.Precision;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
+
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
+public class ConditionBuilder {
+
+  private List<String> metricNames;
+  private List<String> hostnames;
+  private String appId;
+  private String instanceId;
+  private Long startTime;
+  private Long endTime;
+  private Precision precision;
+  private Integer limit;
+  private boolean grouped;
+  private boolean noLimit = false;
+  private Integer fetchSize;
+  private String statement;
+  private Set<String> orderByColumns = new LinkedHashSet<String>();
+  private Integer topN;
+  private boolean isBottomN;
+  private Function topNFunction;
+
+  public ConditionBuilder(List<String> metricNames) {
+    this.metricNames = metricNames;
+  }
+
+  public ConditionBuilder hostnames(List<String> hostnames) {
+    this.hostnames = hostnames;
+    return this;
+  }
+
+  public ConditionBuilder appId(String appId) {
+    this.appId = appId;
+    return this;
+  }
+
+  public ConditionBuilder instanceId(String instanceId) {
+    this.instanceId = instanceId;
+    return this;
+  }
+
+  public ConditionBuilder startTime(Long startTime) {
+    this.startTime = startTime;
+    return this;
+  }
+
+  public ConditionBuilder endTime(Long endTime) {
+    this.endTime = endTime;
+    return this;
+  }
+
+  public ConditionBuilder precision(Precision precision) {
+    this.precision = precision;
+    return this;
+  }
+
+  public ConditionBuilder limit(Integer limit) {
+    this.limit = limit;
+    return this;
+  }
+
+  public ConditionBuilder grouped(boolean grouped) {
+    this.grouped = grouped;
+    return this;
+  }
+
+  public ConditionBuilder noLimit(boolean noLimit) {
+    this.noLimit = noLimit;
+    return this;
+  }
+
+  public ConditionBuilder fetchSize(Integer fetchSize) {
+    this.fetchSize = fetchSize;
+    return this;
+  }
+
+  public ConditionBuilder statement(String statement) {
+    this.statement = statement;
+    return this;
+  }
+
+  public ConditionBuilder orderByColumns(Set<String> orderByColumns) {
+    this.orderByColumns = orderByColumns;
+    return this;
+  }
+
+  public ConditionBuilder topN(Integer topN) {
+    this.topN = topN;
+    return this;
+  }
+
+  public ConditionBuilder isBottomN(boolean isBottomN) {
+    this.isBottomN = isBottomN;
+    return this;
+  }
+
+  public ConditionBuilder topNFunction(Function topNFunction) {
+    this.topNFunction = topNFunction;
+    return this;
+  }
+
+  public Condition build() {
+    if (topN == null) {
+      return new DefaultCondition(
+        metricNames,
+        hostnames, appId, instanceId, startTime, endTime,
+        precision, limit, grouped);
+    } else {
+      return new TopNCondition(metricNames, hostnames, appId, instanceId,
+        startTime, endTime, precision, limit, grouped, topN, topNFunction, 
isBottomN);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/47c7b5ef/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultCondition.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultCondition.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultCondition.java
index 99a6125..b1159c3 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultCondition.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultCondition.java
@@ -16,6 +16,9 @@
  * limitations under the License.
  */
 package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TopNConfig;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
 import org.apache.hadoop.metrics2.sink.timeline.Precision;
 
@@ -38,6 +41,8 @@ public class DefaultCondition implements Condition {
   String statement;
   Set<String> orderByColumns = new LinkedHashSet<String>();
 
+  private static final Log LOG = LogFactory.getLog(DefaultCondition.class);
+
   public DefaultCondition(List<String> metricNames, List<String> hostnames, 
String appId,
                    String instanceId, Long startTime, Long endTime, Precision 
precision,
                    Integer limit, boolean grouped) {
@@ -66,62 +71,11 @@ public class DefaultCondition implements Condition {
 
   public StringBuilder getConditionClause() {
     StringBuilder sb = new StringBuilder();
-    boolean appendConjunction = false;
-    StringBuilder metricsLike = new StringBuilder();
-    StringBuilder metricsIn = new StringBuilder();
 
-    if (getMetricNames() != null) {
-      for (String name : getMetricNames()) {
-        if (name.contains("%")) {
-          if (metricsLike.length() > 1) {
-            metricsLike.append(" OR ");
-          }
-          metricsLike.append("METRIC_NAME LIKE ?");
-        } else {
-          if (metricsIn.length() > 0) {
-            metricsIn.append(", ");
-          }
-          metricsIn.append("?");
-        }
-      }
-
-      if (metricsIn.length()>0) {
-        sb.append("(METRIC_NAME IN (");
-        sb.append(metricsIn);
-        sb.append(")");
-        appendConjunction = true;
-      }
-
-      if (metricsLike.length() > 0) {
-        if (appendConjunction) {
-          sb.append(" OR ");
-        } else {
-          sb.append("(");
-        }
-        sb.append(metricsLike);
-        appendConjunction = true;
-      }
+    boolean appendConjunction = appendMetricNameClause(sb);
 
-      if (appendConjunction) {
-        sb.append(")");
-      }
-    }
+    appendConjunction = appendHostnameClause(sb, appendConjunction);
 
-    if (hostnames != null && getHostnames().size() > 1) {
-      StringBuilder hostnamesCondition = new StringBuilder();
-      for (String hostname: getHostnames()) {
-        if (hostnamesCondition.length() > 0) {
-          hostnamesCondition.append(" ,");
-        } else {
-          hostnamesCondition.append(" HOSTNAME IN (");
-        }
-        hostnamesCondition.append('?');
-      }
-      hostnamesCondition.append(')');
-      appendConjunction = append(sb, appendConjunction, getHostnames(), 
hostnamesCondition.toString());
-    } else {
-      appendConjunction = append(sb, appendConjunction, getHostnames(), " 
HOSTNAME = ?");
-    }
     appendConjunction = append(sb, appendConjunction, getAppId(), " APP_ID = 
?");
     appendConjunction = append(sb, appendConjunction, getInstanceId(), " 
INSTANCE_ID = ?");
     appendConjunction = append(sb, appendConjunction, getStartTime(), " 
SERVER_TIME >= ?");
@@ -259,6 +213,72 @@ public class DefaultCondition implements Condition {
     return null;
   }
 
+  protected boolean appendMetricNameClause(StringBuilder sb) {
+    boolean appendConjunction = false;
+    StringBuilder metricsLike = new StringBuilder();
+    StringBuilder metricsIn = new StringBuilder();
+
+    if (getMetricNames() != null) {
+      for (String name : getMetricNames()) {
+        if (name.contains("%")) {
+          if (metricsLike.length() > 1) {
+            metricsLike.append(" OR ");
+          }
+          metricsLike.append("METRIC_NAME LIKE ?");
+        } else {
+          if (metricsIn.length() > 0) {
+            metricsIn.append(", ");
+          }
+          metricsIn.append("?");
+        }
+      }
+
+      if (metricsIn.length()>0) {
+        sb.append("(METRIC_NAME IN (");
+        sb.append(metricsIn);
+        sb.append(")");
+        appendConjunction = true;
+      }
+
+      if (metricsLike.length() > 0) {
+        if (appendConjunction) {
+          sb.append(" OR ");
+        } else {
+          sb.append("(");
+        }
+        sb.append(metricsLike);
+        appendConjunction = true;
+      }
+
+      if (appendConjunction) {
+        sb.append(")");
+      }
+    }
+    return appendConjunction;
+  }
+
+  protected boolean appendHostnameClause(StringBuilder sb, boolean 
appendConjunction) {
+
+    if (hostnames != null && getHostnames().size() > 1) {
+      StringBuilder hostnamesCondition = new StringBuilder();
+
+      for (String hostname : getHostnames()) {
+        if (hostnamesCondition.length() > 0) {
+          hostnamesCondition.append(" ,");
+        } else {
+          hostnamesCondition.append(" HOSTNAME IN (");
+        }
+        hostnamesCondition.append('?');
+      }
+      hostnamesCondition.append(')');
+      appendConjunction = append(sb, appendConjunction, getHostnames(), 
hostnamesCondition.toString());
+
+    } else {
+      appendConjunction = append(sb, appendConjunction, getHostnames(), " 
HOSTNAME = ?");
+    }
+    return appendConjunction;
+  }
+
   @Override
   public String toString() {
     return "Condition{" +

http://git-wip-us.apache.org/repos/asf/ambari/blob/47c7b5ef/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
index 6ee0006..177e444 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
@@ -298,6 +298,9 @@ public class PhoenixTransactSQL {
     "METRIC_MIN " +
     "FROM %s";
 
+  public static final String TOP_N_INNER_SQL = "SELECT %s %s " +
+    "FROM %s WHERE %s GROUP BY %s ORDER BY %s LIMIT %s";
+
   public static final String GET_METRIC_METADATA_SQL = "SELECT " +
     "METRIC_NAME, APP_ID, UNITS, TYPE, START_TIME, " +
     "SUPPORTS_AGGREGATION FROM METRICS_METADATA";
@@ -457,46 +460,33 @@ public class PhoenixTransactSQL {
     try {
       stmt = connection.prepareStatement(sb.toString());
       int pos = 1;
-      if (condition.getMetricNames() != null) {
-        for (; pos <= condition.getMetricNames().size(); pos++) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Setting pos: " + pos + ", value = " + 
condition.getMetricNames().get(pos - 1));
-          }
-          stmt.setString(pos, condition.getMetricNames().get(pos - 1));
-        }
-      }
-      if (condition.getHostnames() != null) {
-        for (String hostname : condition.getHostnames()) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Setting pos: " + pos + ", value: " + hostname);
-          }
-          stmt.setString(pos++, hostname);
-        }
-      }
-      if (condition.getAppId() != null) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Setting pos: " + pos + ", value: " + 
condition.getAppId());
-        }
-        stmt.setString(pos++, condition.getAppId());
-      }
-      if (condition.getInstanceId() != null) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Setting pos: " + pos + ", value: " + 
condition.getInstanceId());
-        }
-        stmt.setString(pos++, condition.getInstanceId());
-      }
-      if (condition.getStartTime() != null) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Setting pos: " + pos + ", value: " + 
condition.getStartTime());
+      pos = addMetricNames(condition, pos, stmt);
+
+      if (condition instanceof TopNCondition) {
+        TopNCondition topNCondition = (TopNCondition) condition;
+        if (topNCondition.isTopNHostCondition()) {
+          pos = addMetricNames(condition, pos, stmt);
         }
-        stmt.setLong(pos++, condition.getStartTime());
       }
-      if (condition.getEndTime() != null) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Setting pos: " + pos + ", value: " + 
condition.getEndTime());
+
+      pos = addHostNames(condition, pos, stmt);
+
+      if (condition instanceof TopNCondition) {
+        pos = addAppId(condition, pos, stmt);
+        pos = addInstanceId(condition, pos, stmt);
+        pos = addStartTime(condition, pos, stmt);
+        pos = addEndTime(condition, pos, stmt);
+        TopNCondition topNCondition = (TopNCondition) condition;
+        if (topNCondition.isTopNMetricCondition()) {
+          pos = addHostNames(condition, pos, stmt);
         }
-        stmt.setLong(pos, condition.getEndTime());
       }
+
+      pos = addAppId(condition, pos, stmt);
+      pos = addInstanceId(condition, pos, stmt);
+      pos = addStartTime(condition, pos, stmt);
+      addEndTime(condition, pos, stmt);
+
       if (condition.getFetchSize() != null) {
         stmt.setFetchSize(condition.getFetchSize());
       }
@@ -696,24 +686,20 @@ public class PhoenixTransactSQL {
       stmt = connection.prepareStatement(query);
       int pos = 1;
 
-      if (condition.getMetricNames() != null) {
-        for (; pos <= condition.getMetricNames().size(); pos++) {
-          stmt.setString(pos, condition.getMetricNames().get(pos - 1));
-        }
+      pos = addMetricNames(condition, pos, stmt);
+
+      if (condition instanceof TopNCondition) {
+        pos = addAppId(condition, pos, stmt);
+        pos = addInstanceId(condition, pos, stmt);
+        pos = addStartTime(condition, pos, stmt);
+        pos = addEndTime(condition, pos, stmt);
       }
+
       // TODO: Upper case all strings on POST
-      if (condition.getAppId() != null) {
-        stmt.setString(pos++, condition.getAppId());
-      }
-      if (condition.getInstanceId() != null) {
-        stmt.setString(pos++, condition.getInstanceId());
-      }
-      if (condition.getStartTime() != null) {
-        stmt.setLong(pos++, condition.getStartTime());
-      }
-      if (condition.getEndTime() != null) {
-        stmt.setLong(pos, condition.getEndTime());
-      }
+      pos = addAppId(condition, pos, stmt);
+      pos = addInstanceId(condition, pos, stmt);
+      pos = addStartTime(condition, pos, stmt);
+      pos = addEndTime(condition, pos, stmt);
     } catch (SQLException e) {
       if (stmt != null) {
         stmt.close();
@@ -781,4 +767,117 @@ public class PhoenixTransactSQL {
 
     return stmt;
   }
+
+  public static String getTargetTableUsingPrecision(Precision precision, 
boolean withHosts) {
+
+    String inputTable = null;
+    if (precision != null) {
+      if (withHosts) {
+        switch (precision) {
+          case DAYS:
+            inputTable = PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME;
+            break;
+          case HOURS:
+            inputTable = 
PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
+            break;
+          case MINUTES:
+            inputTable = 
PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+            break;
+          default:
+            inputTable = PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+        }
+      } else {
+        switch (precision) {
+          case DAYS:
+            inputTable = 
PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME;
+            break;
+          case HOURS:
+            inputTable = 
PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
+            break;
+          case MINUTES:
+            inputTable = 
PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME;
+            break;
+          default:
+            inputTable = 
PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+        }
+      }
+    } else {
+      if (withHosts) {
+        inputTable = PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+      } else {
+        inputTable = PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+      }
+    }
+    return inputTable;
+  }
+
+  private static int addMetricNames(Condition condition, int pos, 
PreparedStatement stmt) throws SQLException {
+    if (condition.getMetricNames() != null) {
+      for (int pos2 = 1 ; pos2 <= condition.getMetricNames().size(); 
pos2++,pos++) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Setting pos: " + pos + ", value = " + 
condition.getMetricNames().get(pos2 - 1));
+        }
+        stmt.setString(pos, condition.getMetricNames().get(pos2 - 1));
+      }
+    }
+    return pos;
+  }
+
+  private static int addHostNames(Condition condition, int pos, 
PreparedStatement stmt) throws SQLException {
+    int i = pos;
+    if (condition.getHostnames() != null) {
+      for (String hostname : condition.getHostnames()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Setting pos: " + pos + ", value: " + hostname);
+        }
+        stmt.setString(i++, hostname);
+      }
+    }
+    return i;
+  }
+
+
+  private static int addAppId(Condition condition, int pos, PreparedStatement 
stmt) throws SQLException {
+
+    if (condition.getAppId() != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Setting pos: " + pos + ", value: " + condition.getAppId());
+      }
+      stmt.setString(pos++, condition.getAppId());
+    }
+    return pos;
+  }
+
+  private static int addInstanceId(Condition condition, int pos, 
PreparedStatement stmt) throws SQLException {
+
+    if (condition.getInstanceId() != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Setting pos: " + pos + ", value: " + 
condition.getInstanceId());
+      }
+      stmt.setString(pos++, condition.getInstanceId());
+    }
+    return pos;
+  }
+
+  private static int addStartTime(Condition condition, int pos, 
PreparedStatement stmt) throws SQLException {
+    if (condition.getStartTime() != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Setting pos: " + pos + ", value: " + 
condition.getStartTime());
+      }
+      stmt.setLong(pos++, condition.getStartTime());
+    }
+    return pos;
+  }
+
+  private static int addEndTime(Condition condition, int pos, 
PreparedStatement stmt) throws SQLException {
+
+    if (condition.getEndTime() != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Setting pos: " + pos + ", value: " + 
condition.getEndTime());
+      }
+      stmt.setLong(pos++, condition.getEndTime());
+    }
+    return pos;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/47c7b5ef/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/TopNCondition.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/TopNCondition.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/TopNCondition.java
new file mode 100644
index 0000000..fae1655
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/TopNCondition.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.Precision;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
+import java.util.List;
+
+public class TopNCondition extends DefaultCondition{
+
+  private Integer topN;
+  private boolean isBottomN;
+  private Function topNFunction;
+  private static final Log LOG = LogFactory.getLog(TopNCondition.class);
+
+  public TopNCondition(List<String> metricNames, List<String> hostnames, 
String appId,
+                          String instanceId, Long startTime, Long endTime, 
Precision precision,
+                          Integer limit, boolean grouped, Integer topN, 
Function topNFunction,
+                          boolean isBottomN) {
+    super(metricNames, hostnames, appId, instanceId, startTime, endTime, 
precision, limit, grouped);
+    this.topN = topN;
+    this.isBottomN = isBottomN;
+    this.topNFunction = topNFunction;
+  }
+
+  @Override
+  public StringBuilder getConditionClause() {
+    StringBuilder sb = new StringBuilder();
+    boolean appendConjunction = false;
+
+    if (isTopNHostCondition(metricNames, hostnames)) {
+      appendConjunction = appendMetricNameClause(sb);
+
+      StringBuilder hostnamesCondition = new StringBuilder();
+      hostnamesCondition.append(" HOSTNAME IN (");
+      hostnamesCondition.append(getTopNInnerQuery());
+      hostnamesCondition.append(")");
+      appendConjunction = append(sb, appendConjunction, getHostnames(), 
hostnamesCondition.toString());
+
+    } else if (isTopNMetricCondition(metricNames, hostnames)) {
+
+      StringBuilder metricNamesCondition = new StringBuilder();
+      metricNamesCondition.append(" METRIC_NAME IN (");
+      metricNamesCondition.append(getTopNInnerQuery());
+      metricNamesCondition.append(")");
+      appendConjunction = append(sb, appendConjunction, getMetricNames(), 
metricNamesCondition.toString());
+      appendConjunction = appendHostnameClause(sb, appendConjunction);
+    } else {
+      LOG.error("Unsupported TopN Operation requested. Query can have either 
multiple hosts or multiple metric names " +
+        "but not both.");
+      return null;
+    }
+
+    appendConjunction = append(sb, appendConjunction, getAppId(), " APP_ID = 
?");
+    appendConjunction = append(sb, appendConjunction, getInstanceId(), " 
INSTANCE_ID = ?");
+    appendConjunction = append(sb, appendConjunction, getStartTime(), " 
SERVER_TIME >= ?");
+    append(sb, appendConjunction, getEndTime(), " SERVER_TIME < ?");
+
+    return sb;
+  }
+
+  public String getTopNInnerQuery() {
+    String innerQuery = null;
+
+    if (isTopNHostCondition(metricNames, hostnames)) {
+      String groupByClause = "METRIC_NAME, HOSTNAME, APP_ID";
+      String orderByClause = getTopNOrderByClause();
+
+      innerQuery = String.format(PhoenixTransactSQL.TOP_N_INNER_SQL, 
PhoenixTransactSQL.getNaiveTimeRangeHint(getStartTime(), 
NATIVE_TIME_RANGE_DELTA),
+        "HOSTNAME", PhoenixTransactSQL.getTargetTableUsingPrecision(precision, 
true), super.getConditionClause().toString(),
+        groupByClause, orderByClause, topN);
+
+
+    } else if (isTopNMetricCondition(metricNames, hostnames)) {
+
+      String groupByClause = "METRIC_NAME, APP_ID";
+      String orderByClause = getTopNOrderByClause();
+
+      innerQuery = String.format(PhoenixTransactSQL.TOP_N_INNER_SQL, 
PhoenixTransactSQL.getNaiveTimeRangeHint(getStartTime(), 
NATIVE_TIME_RANGE_DELTA),
+        "METRIC_NAME", 
PhoenixTransactSQL.getTargetTableUsingPrecision(precision, (hostnames != null 
&& hostnames.size() == 1)),
+        super.getConditionClause().toString(),
+        groupByClause, orderByClause, topN);
+    }
+
+    return innerQuery;
+  }
+
+  private String getTopNOrderByClause() {
+
+    String orderByClause = null;
+
+    if (topNFunction != null) {
+      switch (topNFunction.getReadFunction()) {
+        case AVG:
+          orderByClause = "ROUND(AVG(METRIC_SUM),2)";
+          break;
+        case SUM:
+          orderByClause = "SUM(METRIC_SUM)";
+          break;
+        default:
+          orderByClause = "MAX(METRIC_MAX)";
+          break;
+      }
+    }
+
+    if (orderByClause == null) {
+      orderByClause = "MAX(METRIC_MAX)";
+    }
+
+    if (!isBottomN) {
+      orderByClause += " DESC";
+    }
+
+    return  orderByClause;
+  }
+
+  public boolean isTopNHostCondition() {
+    return isTopNHostCondition(metricNames, hostnames);
+  }
+
+  public boolean isTopNMetricCondition() {
+    return isTopNMetricCondition(metricNames, hostnames);
+  }
+
+  /**
+   * Check if this is a case of Top N hosts condition
+   * @param metricNames A list of Strings.
+   * @param hostnames A list of Strings.
+   * @return True if it is a Case of Top N Hosts (1 Metric and H hosts).
+   */
+  public static boolean isTopNHostCondition(List<String> metricNames, 
List<String> hostnames) {
+    // Case 1 : 1 Metric, H hosts
+    // Select Top N or Bottom N host series based on 1 metric (max/avg/sum)
+    return (metricNames.size() == 1 && CollectionUtils.isNotEmpty(hostnames));
+
+  }
+
+  /**
+   * Check if this is a case of Top N metrics condition
+   * @param metricNames A list of Strings.
+   * @param hostnames A list of Strings.
+   * @return True if it is a Case of Top N Metrics (M Metric and 1 or 0 host).
+   */
+  public static boolean isTopNMetricCondition(List<String> metricNames, 
List<String> hostnames) {
+    // Case 2 : M Metric names or Regex, 1 or No host
+    // Select Top N or Bottom N metric series based on metric 
values(max/avg/sum)
+    return (metricNames.size() > 1 && (hostnames == null || hostnames.size() 
<= 1));
+  }
+
+  public Integer getTopN() {
+    return topN;
+  }
+
+  public void setTopN(Integer topN) {
+    this.topN = topN;
+  }
+
+  public boolean isBottomN() {
+    return isBottomN;
+  }
+
+  public void setIsBottomN(boolean isBottomN) {
+    this.isBottomN = isBottomN;
+  }
+
+  public Function getTopNFunction() {
+    return topNFunction;
+  }
+
+  public void setTopNFunction(Function topNFunction) {
+    this.topNFunction = topNFunction;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/47c7b5ef/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
index 4cfc415..bac8d16 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.metrics2.annotation.Metric;
 import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
 import 
org.apache.hadoop.metrics2.sink.timeline.PrecisionLimitExceededException;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
+import org.apache.hadoop.metrics2.sink.timeline.TopNConfig;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
@@ -311,52 +312,6 @@ public class TimelineWebServices {
   }
 
   /**
-   * Query for a particular metric satisfying the filter criteria.
-   * @return {@link TimelineMetric}
-   */
-  @GET
-  @Path("/metrics/{metricName}")
-  @Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
-  public TimelineMetric getTimelineMetric(
-    @Context HttpServletRequest req,
-    @Context HttpServletResponse res,
-    @PathParam("metricName") String metricName,
-    @QueryParam("appId") String appId,
-    @QueryParam("instanceId") String instanceId,
-    @QueryParam("hostname") String hostname,
-    @QueryParam("startTime") String startTime,
-    @QueryParam("endTime") String endTime,
-    @QueryParam("precision") String precision,
-    @QueryParam("limit") String limit
-  ) {
-    init(res);
-    try {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Request for metrics => metricName: " + metricName + ", " +
-          "appId: " + appId + ", instanceId: " + instanceId + ", " +
-          "hostname: " + hostname + ", startTime: " + startTime + ", " +
-          "endTime: " + endTime);
-      }
-
-      return timelineMetricStore.getTimelineMetric(metricName,
-        parseListStr(hostname, ","), appId, instanceId, 
parseLongStr(startTime),
-        parseLongStr(endTime), Precision.getPrecision(precision),
-        parseIntStr(limit));
-    } catch (NumberFormatException ne) {
-      throw new BadRequestException("startTime, endTime and limit should be " +
-        "numeric values");
-    } catch (Precision.PrecisionFormatException pfe) {
-      throw new BadRequestException("precision should be seconds, minutes " +
-        "or hours");
-    } catch (IllegalArgumentException iae) {
-      throw new BadRequestException(iae.getMessage());
-    } catch (SQLException | IOException sql) {
-      throw new WebApplicationException(sql,
-        Response.Status.INTERNAL_SERVER_ERROR);
-    }
-  }
-
-  /**
    * Query for a set of different metrics satisfying the filter criteria.
    * All query params are optional. The default limit will apply if none
    * specified.
@@ -385,7 +340,10 @@ public class TimelineWebServices {
     @QueryParam("endTime") String endTime,
     @QueryParam("precision") String precision,
     @QueryParam("limit") String limit,
-    @QueryParam("grouped") String grouped
+    @QueryParam("grouped") String grouped,
+    @QueryParam("topN") String topN,
+    @QueryParam("topNFunction") String topNFunction,
+    @QueryParam("isBottomN") String isBottomN
   ) {
     init(res);
     try {
@@ -401,7 +359,7 @@ public class TimelineWebServices {
         parseListStr(metricNames, ","), parseListStr(hostname, ","), appId, 
instanceId,
         parseLongStr(startTime), parseLongStr(endTime),
         Precision.getPrecision(precision), parseIntStr(limit),
-        parseBoolean(grouped));
+        parseBoolean(grouped), parseTopNConfig(topN, topNFunction, isBottomN));
 
     } catch (NumberFormatException ne) {
       throw new BadRequestException("startTime and limit should be numeric " +
@@ -587,6 +545,22 @@ public class TimelineWebServices {
     return booleanStr == null || Boolean.parseBoolean(booleanStr);
   }
 
+  private static TopNConfig parseTopNConfig(String topN, String topNFunction,
+                                            String bottomN) {
+    if (topN == null || topN.isEmpty()) {
+      return null;
+    }
+    Integer topNValue = parseIntStr(topN);
+
+    if (topNValue == 0) {
+      LOG.info("Invalid Input for TopN query. Ignoring TopN Request.");
+      return null;
+    }
+
+    Boolean isBottomN = (bottomN != null && Boolean.parseBoolean(bottomN));
+    return new TopNConfig(topNValue, topNFunction, isBottomN);
+  }
+
   /**
    * Parses delimited string to list of strings. It skips strings that are
    * effectively empty (i.e. only whitespace).

http://git-wip-us.apache.org/repos/asf/ambari/blob/47c7b5ef/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestPhoenixTransactSQL.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestPhoenixTransactSQL.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestPhoenixTransactSQL.java
index 9c6617c..888234f 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestPhoenixTransactSQL.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestPhoenixTransactSQL.java
@@ -23,6 +23,7 @@ import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.SplitByMetricNamesCondition;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.TopNCondition;
 import org.easymock.Capture;
 import org.junit.Assert;
 import org.junit.Test;
@@ -554,4 +555,61 @@ public class TestPhoenixTransactSQL {
     Assert.assertTrue(exceptionThrown);
     Assert.assertTrue(requestedSizeFoundInMessage);
   }
+
+  @Test
+  public void testTopNHostsConditionClause() throws Exception {
+    List<String> hosts = Arrays.asList("h1", "h2", "h3", "h4");
+
+    Condition condition = new TopNCondition(
+      Arrays.asList("cpu_user"), hosts,
+      "a1", "i1", 1407959718L, 1407959918L, null, null, false, 2, null, false);
+
+    String conditionClause = condition.getConditionClause().toString();
+    String expectedClause = "(METRIC_NAME IN (?)) AND HOSTNAME IN (" +
+      "SELECT " + 
PhoenixTransactSQL.getNaiveTimeRangeHint(condition.getStartTime(),120000l) +
+      " HOSTNAME FROM METRIC_RECORD WHERE " +
+          "(METRIC_NAME IN (?)) AND " +
+          "HOSTNAME IN (? ,? ,? ,?) AND " +
+          "APP_ID = ? AND INSTANCE_ID = ? AND " +
+          "SERVER_TIME >= ? AND SERVER_TIME < ? " +
+          "GROUP BY METRIC_NAME, HOSTNAME, APP_ID ORDER BY MAX(METRIC_MAX) 
DESC LIMIT 2) " +
+      "AND APP_ID = ? AND INSTANCE_ID = ? AND SERVER_TIME >= ? AND SERVER_TIME 
< ?";
+
+    Assert.assertEquals(conditionClause, expectedClause);
+  }
+
+  @Test
+  public void testTopNMetricsConditionClause() throws Exception {
+    List<String> metricNames = Arrays.asList("m1", "m2", "m3");
+
+    Condition condition = new TopNCondition(
+      metricNames, Collections.singletonList("h1"),
+      "a1", "i1", 1407959718L, 1407959918L, null, null, false, 2, null, false);
+
+    String conditionClause = condition.getConditionClause().toString();
+    String expectedClause = " METRIC_NAME IN (" +
+      "SELECT " + 
PhoenixTransactSQL.getNaiveTimeRangeHint(condition.getStartTime(),120000l) +
+      " METRIC_NAME FROM METRIC_RECORD WHERE " +
+      "(METRIC_NAME IN (?, ?, ?)) AND " +
+      "HOSTNAME = ? AND " +
+      "APP_ID = ? AND INSTANCE_ID = ? AND " +
+      "SERVER_TIME >= ? AND SERVER_TIME < ? " +
+      "GROUP BY METRIC_NAME, APP_ID ORDER BY MAX(METRIC_MAX) DESC LIMIT 2) " +
+      "AND HOSTNAME = ? AND APP_ID = ? AND INSTANCE_ID = ? AND SERVER_TIME >= 
? AND SERVER_TIME < ?";
+
+    Assert.assertEquals(conditionClause, expectedClause);
+  }
+
+  @Test
+  public void testTopNMetricsIllegalConditionClause() throws Exception {
+    List<String> metricNames = Arrays.asList("m1", "m2");
+
+    List<String> hosts = Arrays.asList("h1", "h2");
+
+    Condition condition = new TopNCondition(
+      metricNames, hosts,
+      "a1", "i1", 1407959718L, 1407959918L, null, null, false, 2, null, false);
+
+    Assert.assertEquals(condition.getConditionClause(), null);
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/47c7b5ef/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
index 16bbf0e..cfd1f58 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.metrics2.sink.timeline.Precision;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.metrics2.sink.timeline.TopNConfig;
 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
 import java.io.IOException;
 import java.sql.SQLException;
@@ -37,7 +38,7 @@ public class TestTimelineMetricStore implements 
TimelineMetricStore {
   @Override
   public TimelineMetrics getTimelineMetrics(List<String> metricNames,
       List<String> hostnames, String applicationId, String instanceId, Long 
startTime,
-      Long endTime, Precision precision, Integer limit, boolean groupedByHost) 
throws SQLException,
+      Long endTime, Precision precision, Integer limit, boolean groupedByHost, 
TopNConfig topNConfig) throws SQLException,
     IOException {
     TimelineMetrics timelineMetrics = new TimelineMetrics();
     List<TimelineMetric> metricList = new ArrayList<TimelineMetric>();
@@ -72,14 +73,6 @@ public class TestTimelineMetricStore implements 
TimelineMetricStore {
   }
 
   @Override
-  public TimelineMetric getTimelineMetric(String metricName, List<String> 
hostname,
-      String applicationId, String instanceId, Long startTime, Long endTime,
-      Precision precision, Integer limit) throws SQLException, IOException {
-
-    return null;
-  }
-
-  @Override
   public TimelinePutResponse putMetrics(TimelineMetrics metrics)
       throws SQLException, IOException {
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/47c7b5ef/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStoreWatcherTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStoreWatcherTest.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStoreWatcherTest.java
index 277a98c..a94f4c5 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStoreWatcherTest.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStoreWatcherTest.java
@@ -20,6 +20,7 @@ package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
 import org.apache.hadoop.metrics2.sink.timeline.Precision;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.metrics2.sink.timeline.TopNConfig;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
 import org.easymock.EasyMock;
@@ -53,11 +54,11 @@ public class TimelineMetricStoreWatcherTest {
       .andReturn(new TimelinePutResponse());
 
     // metric found
-    expect(metricStore.getTimelineMetric(anyObject(String.class),
-      EasyMock.<List<String>> anyObject(), anyObject(String.class),
+    expect(metricStore.getTimelineMetrics(EasyMock.<List<String>>anyObject(),
+      EasyMock.<List<String>>anyObject(), anyObject(String.class),
       anyObject(String.class), anyObject(Long.class), anyObject(Long.class),
-      eq(Precision.SECONDS), eq(1)))
-      .andReturn(new TimelineMetric()).anyTimes();
+      eq(Precision.SECONDS), eq(1), eq(true), anyObject(TopNConfig.class)))
+      .andReturn(null).anyTimes();
 
     mockStatic(ExitUtil.class);
 
@@ -80,10 +81,10 @@ public class TimelineMetricStoreWatcherTest {
       .andReturn(new TimelinePutResponse());
 
     // no metrics found
-    expect(metricStore.getTimelineMetric(anyObject(String.class),
-      EasyMock.<List<String>> anyObject(), anyObject(String.class),
+    expect(metricStore.getTimelineMetrics(EasyMock.<List<String>>anyObject(),
+      EasyMock.<List<String>>anyObject(), anyObject(String.class),
       anyObject(String.class), anyObject(Long.class), anyObject(Long.class),
-      eq(Precision.SECONDS), eq(1)))
+      eq(Precision.SECONDS), eq(1), eq(true), anyObject(TopNConfig.class)))
       .andReturn(null).anyTimes();
 
     String msg = "Error getting metrics from TimelineMetricStore. " +

Reply via email to