AMBARI-14578. Refactor Collector logging for AMS. (swagle)

Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/2c7aec60
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/2c7aec60
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/2c7aec60

Branch: refs/heads/branch-2.2
Commit: 2c7aec60899392f12803a7f846774e39879e1fb7
Parents: c6af019
Author: Siddharth Wagle <swa...@hortonworks.com>
Authored: Thu Jan 7 18:04:02 2016 -0800
Committer: Siddharth Wagle <swa...@hortonworks.com>
Committed: Thu Jan 7 18:04:02 2016 -0800

----------------------------------------------------------------------
 .../timeline/HBaseTimelineMetricStore.java      |   2 +
 .../timeline/TimelineMetricStoreWatcher.java    |   8 +-
 .../aggregators/AbstractTimelineAggregator.java |  34 +-
 .../TimelineMetricAggregatorFactory.java        |  22 +-
 .../TimelineMetricClusterAggregator.java        |   6 +-
 .../TimelineMetricClusterAggregatorSecond.java  |  10 +-
 .../TimelineMetricHostAggregator.java           |   9 +-
 .../v2/TimelineMetricClusterAggregator.java     |  23 +-
 .../v2/TimelineMetricHostAggregator.java        |  15 +-
 .../metrics/timeline/query/EmptyCondition.java  |   8 +
 .../timeline/AbstractMiniHBaseClusterTest.java  |  40 ++
 .../AbstractTimelineAggregatorTest.java         | 277 --------
 .../metrics/timeline/ITClusterAggregator.java   | 702 -------------------
 .../metrics/timeline/ITMetricAggregator.java    | 398 -----------
 .../metrics/timeline/TestClusterSuite.java      |   2 +
 .../AbstractTimelineAggregatorTest.java         | 276 ++++++++
 .../aggregators/ITClusterAggregator.java        | 677 ++++++++++++++++++
 .../aggregators/ITMetricAggregator.java         | 373 ++++++++++
 18 files changed, 1447 insertions(+), 1435 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/2c7aec60/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
index 1d654fd..c4e946a 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
@@ -148,6 +148,8 @@ public class HBaseTimelineMetricStore extends 
AbstractService implements Timelin
       executorService.scheduleWithFixedDelay(
         new TimelineMetricStoreWatcher(this, configuration), initDelay, delay,
         TimeUnit.SECONDS);
+      LOG.info("Started watchdog for timeline metrics store with initial " +
+        "delay = " + initDelay + ", delay = " + delay);
 
       isInitialized = true;
     }

http://git-wip-us.apache.org/repos/asf/ambari/blob/2c7aec60/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStoreWatcher.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStoreWatcher.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStoreWatcher.java
index 363b43a..632df3f 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStoreWatcher.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStoreWatcher.java
@@ -37,8 +37,7 @@ import java.util.concurrent.TimeUnit;
  */
 public class TimelineMetricStoreWatcher implements Runnable {
 
-  private static final Log LOG = LogFactory
-    .getLog(TimelineMetricStoreWatcher.class);
+  private static final Log LOG = 
LogFactory.getLog(TimelineMetricStoreWatcher.class);
   private static final String FAKE_METRIC_NAME = 
"TimelineMetricStoreWatcher.FakeMetric";
   private static final String FAKE_HOSTNAME = "fakehostname";
   private static final String FAKE_APP_ID = "timeline_metric_store_watcher";
@@ -60,16 +59,13 @@ public class TimelineMetricStoreWatcher implements Runnable 
{
 
   @Override
   public void run() {
-
     if (checkMetricStore()) {
       failures = 0;
       if (LOG.isDebugEnabled()) {
         LOG.debug("Successfully got metrics from TimelineMetricStore");
       }
     } else {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Failed to get metrics from TimelineMetricStore");
-      }
+      LOG.info("Failed to get metrics from TimelineMetricStore, attempt = " + 
failures);
       failures++;
     }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/2c7aec60/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
index 8bdddf2..fce5a39 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
@@ -18,14 +18,14 @@
 package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
 import java.io.File;
 import java.io.IOException;
 import java.sql.Connection;
@@ -43,12 +43,11 @@ import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
  */
 public abstract class AbstractTimelineAggregator implements 
TimelineMetricAggregator {
   protected final PhoenixHBaseAccessor hBaseAccessor;
-  private final Log LOG;
+  protected final Logger LOG;
   private Clock clock;
   protected final long checkpointDelayMillis;
   protected final Integer resultsetFetchSize;
   protected Configuration metricsConf;
-
   private String checkpointLocation;
   private Long sleepIntervalMillis;
   private Integer checkpointCutOffMultiplier;
@@ -56,24 +55,29 @@ public abstract class AbstractTimelineAggregator implements 
TimelineMetricAggreg
   protected String tableName;
   protected String outputTableName;
   protected Long nativeTimeRangeDelay;
+  // Explicitly name aggregators for logging needs
+  private final String aggregatorName;
 
-  public AbstractTimelineAggregator(PhoenixHBaseAccessor hBaseAccessor,
-                                    Configuration metricsConf, Clock clk) {
+  AbstractTimelineAggregator(String aggregatorName,
+                             PhoenixHBaseAccessor hBaseAccessor,
+                             Configuration metricsConf, Clock clk) {
+    this.aggregatorName = aggregatorName;
     this.hBaseAccessor = hBaseAccessor;
     this.metricsConf = metricsConf;
-    this.checkpointDelayMillis = SECONDS.toMillis(
-      metricsConf.getInt(AGGREGATOR_CHECKPOINT_DELAY, 120));
+    this.checkpointDelayMillis = 
SECONDS.toMillis(metricsConf.getInt(AGGREGATOR_CHECKPOINT_DELAY, 120));
     this.resultsetFetchSize = metricsConf.getInt(RESULTSET_FETCH_SIZE, 2000);
-    this.LOG = LogFactory.getLog(this.getClass());
+    this.LOG = LoggerFactory.getLogger(aggregatorName);
     this.clock = clk;
   }
 
-  public AbstractTimelineAggregator(PhoenixHBaseAccessor hBaseAccessor,
-                                    Configuration metricsConf) {
-    this(hBaseAccessor, metricsConf, new SystemClock());
+  AbstractTimelineAggregator(String aggregatorName,
+                             PhoenixHBaseAccessor hBaseAccessor,
+                             Configuration metricsConf) {
+    this(aggregatorName, hBaseAccessor, metricsConf, new SystemClock());
   }
 
-  public AbstractTimelineAggregator(PhoenixHBaseAccessor hBaseAccessor,
+  public AbstractTimelineAggregator(String aggregatorName,
+                                    PhoenixHBaseAccessor hBaseAccessor,
                                     Configuration metricsConf,
                                     String checkpointLocation,
                                     Long sleepIntervalMillis,
@@ -82,7 +86,7 @@ public abstract class AbstractTimelineAggregator implements 
TimelineMetricAggreg
                                     String tableName,
                                     String outputTableName,
                                     Long nativeTimeRangeDelay) {
-    this(hBaseAccessor, metricsConf);
+    this(aggregatorName, hBaseAccessor, metricsConf);
     this.checkpointLocation = checkpointLocation;
     this.sleepIntervalMillis = sleepIntervalMillis;
     this.checkpointCutOffMultiplier = checkpointCutOffMultiplier;
@@ -199,7 +203,7 @@ public abstract class AbstractTimelineAggregator implements 
TimelineMetricAggreg
         }
       }
     } catch (IOException io) {
-      LOG.debug(io);
+      LOG.debug("", io);
     }
     return -1;
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/2c7aec60/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
index ba019fa..f0b2fda 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
@@ -103,6 +103,7 @@ public class TimelineMetricAggregatorFactory {
 
     if (useGroupByAggregator(metricsConf)) {
       return new 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricHostAggregator(
+        "TimelineMetricHostAggregatorMinute",
         hBaseAccessor, metricsConf,
         checkpointLocation,
         sleepIntervalMillis,
@@ -114,7 +115,9 @@ public class TimelineMetricAggregatorFactory {
       );
     }
 
-    return new TimelineMetricHostAggregator(hBaseAccessor, metricsConf,
+    return new TimelineMetricHostAggregator(
+      "TimelineMetricHostAggregatorMinute",
+      hBaseAccessor, metricsConf,
       checkpointLocation,
       sleepIntervalMillis,
       checkpointCutOffMultiplier,
@@ -147,6 +150,7 @@ public class TimelineMetricAggregatorFactory {
 
     if (useGroupByAggregator(metricsConf)) {
       return new 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricHostAggregator(
+        "TimelineMetricHostAggregatorHourly",
         hBaseAccessor, metricsConf,
         checkpointLocation,
         sleepIntervalMillis,
@@ -158,7 +162,9 @@ public class TimelineMetricAggregatorFactory {
       );
     }
 
-    return new TimelineMetricHostAggregator(hBaseAccessor, metricsConf,
+    return new TimelineMetricHostAggregator(
+      "TimelineMetricHostAggregatorHourly",
+      hBaseAccessor, metricsConf,
       checkpointLocation,
       sleepIntervalMillis,
       checkpointCutOffMultiplier,
@@ -191,6 +197,7 @@ public class TimelineMetricAggregatorFactory {
 
     if (useGroupByAggregator(metricsConf)) {
       return new 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricHostAggregator(
+        "TimelineMetricHostAggregatorDaily",
         hBaseAccessor, metricsConf,
         checkpointLocation,
         sleepIntervalMillis,
@@ -202,7 +209,9 @@ public class TimelineMetricAggregatorFactory {
       );
     }
 
-    return new TimelineMetricHostAggregator(hBaseAccessor, metricsConf,
+    return new TimelineMetricHostAggregator(
+      "TimelineMetricHostAggregatorDaily",
+      hBaseAccessor, metricsConf,
       checkpointLocation,
       sleepIntervalMillis,
       checkpointCutOffMultiplier,
@@ -241,6 +250,7 @@ public class TimelineMetricAggregatorFactory {
 
     // Second based aggregation have added responsibility of time slicing
     return new TimelineMetricClusterAggregatorSecond(
+      "TimelineClusterAggregatorSecond",
       hBaseAccessor, metricsConf,
       checkpointLocation,
       sleepIntervalMillis,
@@ -278,6 +288,7 @@ public class TimelineMetricAggregatorFactory {
 
     if (useGroupByAggregator(metricsConf)) {
       return new 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricClusterAggregator(
+        "TimelineClusterAggregatorMinute",
         hBaseAccessor, metricsConf,
         checkpointLocation,
         sleepIntervalMillis,
@@ -290,6 +301,7 @@ public class TimelineMetricAggregatorFactory {
     }
 
     return new TimelineMetricClusterAggregator(
+      "TimelineClusterAggregatorMinute",
       hBaseAccessor, metricsConf,
       checkpointLocation,
       sleepIntervalMillis,
@@ -326,6 +338,7 @@ public class TimelineMetricAggregatorFactory {
 
     if (useGroupByAggregator(metricsConf)) {
       return new 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricClusterAggregator(
+        "TimelineClusterAggregatorHourly",
         hBaseAccessor, metricsConf,
         checkpointLocation,
         sleepIntervalMillis,
@@ -338,6 +351,7 @@ public class TimelineMetricAggregatorFactory {
     }
 
     return new TimelineMetricClusterAggregator(
+      "TimelineClusterAggregatorHourly",
       hBaseAccessor, metricsConf,
       checkpointLocation,
       sleepIntervalMillis,
@@ -374,6 +388,7 @@ public class TimelineMetricAggregatorFactory {
 
     if (useGroupByAggregator(metricsConf)) {
       return new 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricClusterAggregator(
+        "TimelineClusterAggregatorDaily",
         hBaseAccessor, metricsConf,
         checkpointLocation,
         sleepIntervalMillis,
@@ -386,6 +401,7 @@ public class TimelineMetricAggregatorFactory {
     }
 
     return new TimelineMetricClusterAggregator(
+      "TimelineClusterAggregatorDaily",
       hBaseAccessor, metricsConf,
       checkpointLocation,
       sleepIntervalMillis,

http://git-wip-us.apache.org/repos/asf/ambari/blob/2c7aec60/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
index 9ed11e1..1c1c4b6 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
@@ -36,10 +36,10 @@ import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 
 public class TimelineMetricClusterAggregator extends 
AbstractTimelineAggregator {
   private final TimelineMetricReadHelper readHelper = new 
TimelineMetricReadHelper(true);
-  private static final Log LOG = 
LogFactory.getLog(TimelineMetricClusterAggregator.class);
   private final boolean isClusterPrecisionInputTable;
 
-  public TimelineMetricClusterAggregator(PhoenixHBaseAccessor hBaseAccessor,
+  public TimelineMetricClusterAggregator(String aggregatorName,
+                                         PhoenixHBaseAccessor hBaseAccessor,
                                          Configuration metricsConf,
                                          String checkpointLocation,
                                          Long sleepIntervalMillis,
@@ -48,7 +48,7 @@ public class TimelineMetricClusterAggregator extends 
AbstractTimelineAggregator
                                          String inputTableName,
                                          String outputTableName,
                                          Long nativeTimeRangeDelay) {
-    super(hBaseAccessor, metricsConf, checkpointLocation,
+    super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
       sleepIntervalMillis, checkpointCutOffMultiplier,
       hostAggregatorDisabledParam, inputTableName, outputTableName,
       nativeTimeRangeDelay);

http://git-wip-us.apache.org/repos/asf/ambari/blob/2c7aec60/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
index 1c7bf7f..4c96e5a 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
@@ -43,7 +43,6 @@ import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
  * the precision table and saves into the aggregate.
  */
 public class TimelineMetricClusterAggregatorSecond extends 
AbstractTimelineAggregator {
-  private static final Log LOG = 
LogFactory.getLog(TimelineMetricClusterAggregatorSecond.class);
   public Long timeSliceIntervalMillis;
   private TimelineMetricReadHelper timelineMetricReadHelper = new 
TimelineMetricReadHelper(true);
   // Aggregator to perform app-level aggregates for host metrics
@@ -51,7 +50,8 @@ public class TimelineMetricClusterAggregatorSecond extends 
AbstractTimelineAggre
   // 1 minute client side buffering adjustment
   private final Long serverTimeShiftAdjustment;
 
-  public TimelineMetricClusterAggregatorSecond(PhoenixHBaseAccessor 
hBaseAccessor,
+  public TimelineMetricClusterAggregatorSecond(String aggregatorName,
+                                               PhoenixHBaseAccessor 
hBaseAccessor,
                                                Configuration metricsConf,
                                                String checkpointLocation,
                                                Long sleepIntervalMillis,
@@ -61,9 +61,9 @@ public class TimelineMetricClusterAggregatorSecond extends 
AbstractTimelineAggre
                                                String outputTableName,
                                                Long nativeTimeRangeDelay,
                                                Long timeSliceInterval) {
-    super(hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis,
-      checkpointCutOffMultiplier, aggregatorDisabledParam, tableName,
-      outputTableName, nativeTimeRangeDelay);
+    super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
+      sleepIntervalMillis, checkpointCutOffMultiplier, aggregatorDisabledParam,
+      tableName, outputTableName, nativeTimeRangeDelay);
 
     appAggregator = new TimelineMetricAppAggregator(metricsConf);
     this.timeSliceIntervalMillis = timeSliceInterval;

http://git-wip-us.apache.org/repos/asf/ambari/blob/2c7aec60/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
index 37ddeb3..e0fa26e 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
@@ -36,7 +36,8 @@ public class TimelineMetricHostAggregator extends 
AbstractTimelineAggregator {
   private static final Log LOG = 
LogFactory.getLog(TimelineMetricHostAggregator.class);
   TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
 
-  public TimelineMetricHostAggregator(PhoenixHBaseAccessor hBaseAccessor,
+  public TimelineMetricHostAggregator(String aggregatorName,
+                                      PhoenixHBaseAccessor hBaseAccessor,
                                       Configuration metricsConf,
                                       String checkpointLocation,
                                       Long sleepIntervalMillis,
@@ -45,9 +46,9 @@ public class TimelineMetricHostAggregator extends 
AbstractTimelineAggregator {
                                       String tableName,
                                       String outputTableName,
                                       Long nativeTimeRangeDelay) {
-    super(hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis,
-      checkpointCutOffMultiplier, hostAggregatorDisabledParam, tableName,
-      outputTableName, nativeTimeRangeDelay);
+    super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
+      sleepIntervalMillis, checkpointCutOffMultiplier, 
hostAggregatorDisabledParam,
+      tableName, outputTableName, nativeTimeRangeDelay);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/ambari/blob/2c7aec60/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
index ca95206..5257412 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
@@ -17,38 +17,25 @@
  */
 package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AbstractTimelineAggregator;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricReadHelper;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.EmptyCondition;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
-
 import java.io.IOException;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
 
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_AGGREGATED_APP_METRIC_GROUPBY_SQL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_TIME_SQL;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
 
 public class TimelineMetricClusterAggregator extends 
AbstractTimelineAggregator {
-  private static final Log LOG = 
LogFactory.getLog(TimelineMetricClusterAggregator.class);
   private final String aggregateColumnName;
 
-  public TimelineMetricClusterAggregator(PhoenixHBaseAccessor hBaseAccessor,
+  public TimelineMetricClusterAggregator(String aggregatorName,
+                                         PhoenixHBaseAccessor hBaseAccessor,
                                          Configuration metricsConf,
                                          String checkpointLocation,
                                          Long sleepIntervalMillis,
@@ -57,7 +44,7 @@ public class TimelineMetricClusterAggregator extends 
AbstractTimelineAggregator
                                          String inputTableName,
                                          String outputTableName,
                                          Long nativeTimeRangeDelay) {
-    super(hBaseAccessor, metricsConf, checkpointLocation,
+    super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
       sleepIntervalMillis, checkpointCutOffMultiplier,
       hostAggregatorDisabledParam, inputTableName, outputTableName,
       nativeTimeRangeDelay);
@@ -88,6 +75,10 @@ public class TimelineMetricClusterAggregator extends 
AbstractTimelineAggregator
       outputTableName, aggregateColumnName, tableName,
       startTime, endTime));
 
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Condition: " + condition.toString());
+    }
+
     return condition;
   }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/2c7aec60/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java
index 1e1712f..1c46642 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java
@@ -34,9 +34,9 @@ import java.util.Date;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL;
 
 public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
-  private static final Log LOG = 
LogFactory.getLog(TimelineMetricHostAggregator.class);
 
-  public TimelineMetricHostAggregator(PhoenixHBaseAccessor hBaseAccessor,
+  public TimelineMetricHostAggregator(String aggregatorName,
+                                      PhoenixHBaseAccessor hBaseAccessor,
                                       Configuration metricsConf,
                                       String checkpointLocation,
                                       Long sleepIntervalMillis,
@@ -45,9 +45,9 @@ public class TimelineMetricHostAggregator extends 
AbstractTimelineAggregator {
                                       String tableName,
                                       String outputTableName,
                                       Long nativeTimeRangeDelay) {
-    super(hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis,
-      checkpointCutOffMultiplier, hostAggregatorDisabledParam, tableName,
-      outputTableName, nativeTimeRangeDelay);
+    super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
+      sleepIntervalMillis, checkpointCutOffMultiplier, 
hostAggregatorDisabledParam,
+      tableName, outputTableName, nativeTimeRangeDelay);
   }
 
   @Override
@@ -67,7 +67,10 @@ public class TimelineMetricHostAggregator extends 
AbstractTimelineAggregator {
       PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, 
nativeTimeRangeDelay),
       outputTableName, tableName, startTime, endTime));
 
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Condition: " + condition.toString());
+    }
+
     return condition;
   }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/2c7aec60/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/EmptyCondition.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/EmptyCondition.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/EmptyCondition.java
index 30e3d4d..34174e2 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/EmptyCondition.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/EmptyCondition.java
@@ -136,4 +136,12 @@ public class EmptyCondition implements Condition {
   public boolean doUpdate() {
     return doUpdate;
   }
+
+  @Override
+  public String toString() {
+    return "EmptyCondition{ " +
+      " statement = " + this.getStatement() +
+      " doUpdate = " + this.doUpdate() +
+      " }";
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/2c7aec60/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
index a4d53b3..e73c741 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
@@ -25,6 +25,8 @@ import 
org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.ConnectionProvider;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
 import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
 import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.query.QueryServices;
@@ -32,6 +34,7 @@ import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -57,6 +60,8 @@ import static org.assertj.core.api.Assertions.assertThat;
 public abstract class AbstractMiniHBaseClusterTest extends BaseTest {
 
   protected static final long BATCH_SIZE = 3;
+  protected Connection conn;
+  protected PhoenixHBaseAccessor hdb;
 
   @BeforeClass
   public static void doSetup() throws Exception {
@@ -75,6 +80,41 @@ public abstract class AbstractMiniHBaseClusterTest extends 
BaseTest {
     dropNonSystemTables();
   }
 
+  @Before
+  public void setUp() throws Exception {
+    
Logger.getLogger("org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline").setLevel(Level.DEBUG);
+    hdb = createTestableHBaseAccessor();
+    // inits connection, starts mini cluster
+    conn = getConnection(getUrl());
+
+    hdb.initMetricSchema();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    Connection conn = null;
+    Statement stmt = null;
+    try {
+      conn = getConnection(getUrl());
+      stmt = conn.createStatement();
+
+      stmt.execute("delete from METRIC_AGGREGATE");
+      stmt.execute("delete from METRIC_AGGREGATE_HOURLY");
+      stmt.execute("delete from METRIC_RECORD");
+      stmt.execute("delete from METRIC_RECORD_HOURLY");
+      stmt.execute("delete from METRIC_RECORD_MINUTE");
+      conn.commit();
+    } finally {
+      if (stmt != null) {
+        stmt.close();
+      }
+
+      if (conn != null) {
+        conn.close();
+      }
+    }
+  }
+
   @After
   public void cleanUpAfterTest() throws Exception {
     deletePriorTables(HConstants.LATEST_TIMESTAMP, getUrl());

http://git-wip-us.apache.org/repos/asf/ambari/blob/2c7aec60/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregatorTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregatorTest.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregatorTest.java
deleted file mode 100644
index c22e734..0000000
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregatorTest.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
-
-import org.apache.hadoop.conf.Configuration;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AbstractTimelineAggregator;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
-import org.apache.hadoop.yarn.util.Clock;
-import org.junit.Before;
-import org.junit.Test;
-import java.io.IOException;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.concurrent.atomic.AtomicLong;
-import static junit.framework.Assert.assertEquals;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATOR_CHECKPOINT_DELAY;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.RESULTSET_FETCH_SIZE;
-
-public class AbstractTimelineAggregatorTest {
-
-  private AbstractTimelineAggregator agg;
-  TestClock clock = new TestClock();
-
-  AtomicLong startTimeInDoWork;
-  AtomicLong endTimeInDoWork;
-  AtomicLong checkPoint;
-  int actualRuns;
-
-  long sleepIntervalMillis;
-  int checkpointCutOffMultiplier;
-
-  @Before
-  public void setUp() throws Exception {
-    sleepIntervalMillis = 30000l;
-    checkpointCutOffMultiplier = 2;
-
-    Configuration metricsConf = new Configuration();
-    metricsConf.setInt(AGGREGATOR_CHECKPOINT_DELAY, 0);
-    metricsConf.setInt(RESULTSET_FETCH_SIZE, 2000);
-
-    startTimeInDoWork = new AtomicLong(0);
-    endTimeInDoWork = new AtomicLong(0);
-    checkPoint = new AtomicLong(-1);
-    actualRuns = 0;
-
-    agg = new AbstractTimelineAggregator(
-      null, metricsConf, clock) {
-      @Override
-      public boolean doWork(long startTime, long endTime) {
-        startTimeInDoWork.set(startTime);
-        endTimeInDoWork.set(endTime);
-        actualRuns++;
-
-        return true;
-      }
-
-      @Override
-      protected Condition
-      prepareMetricQueryCondition(long startTime, long endTime) {
-        return null;
-      }
-
-      @Override
-      protected void aggregate(ResultSet rs, long startTime,
-                               long endTime) throws IOException, SQLException {
-      }
-
-      @Override
-      protected Long getSleepIntervalMillis() {
-        return sleepIntervalMillis;
-      }
-
-      @Override
-      protected Integer getCheckpointCutOffMultiplier() {
-        return checkpointCutOffMultiplier;
-      }
-
-      @Override
-      public boolean isDisabled() {
-        return false;
-      }
-
-      @Override
-      protected String getCheckpointLocation() {
-        return "dummy_ckptFile";
-      }
-
-      protected long readCheckPoint() {
-        return checkPoint.get();
-      }
-
-      @Override
-      protected void saveCheckPoint(long checkpointTime) throws IOException {
-        checkPoint.set(checkpointTime);
-      }
-    };
-
-
-  }
-
-  @Test
-  public void testDoWorkOnZeroDelay() throws Exception {
-
-    // starting at time 0;
-    clock.setTime(0);
-
-    long sleep = agg.runOnce(sleepIntervalMillis);
-    assertEquals("startTime should be zero", 0, startTimeInDoWork.get());
-    assertEquals("endTime  should be zero", 0, endTimeInDoWork.get());
-    assertEquals(0, checkPoint.get());
-    assertEquals(sleep, sleepIntervalMillis);
-    assertEquals("Do not aggregate on first run", 0, actualRuns);
-
-    // exactly one sleepInterval
-    clock.setTime(clock.getTime() + sleepIntervalMillis);
-    sleep = agg.runOnce(sleepIntervalMillis);
-    assertEquals("startTime", clock.getTime() -
-        sleepIntervalMillis,
-      startTimeInDoWork.get());
-    assertEquals("endTime", clock.getTime(),
-      endTimeInDoWork.get());
-    assertEquals(clock.getTime(), checkPoint.get());
-    assertEquals(sleep, sleepIntervalMillis);
-    assertEquals(1, actualRuns);
-
-    // exactly one sleepInterval
-    clock.setTime(clock.getTime() + sleepIntervalMillis);
-    sleep = agg.runOnce(sleepIntervalMillis);
-    assertEquals("startTime", clock.getTime() -
-        sleepIntervalMillis,
-      startTimeInDoWork.get());
-    assertEquals("endTime", clock.getTime(),
-      endTimeInDoWork.get());
-    assertEquals(clock.getTime(), checkPoint.get());
-    assertEquals(sleep, sleepIntervalMillis);
-    assertEquals(2, actualRuns);
-
-    // checkpointCutOffMultiplier x sleepInterval - should pass,
-    // it will aggregate only first part of the whole 2x interval
-    // and sleep as usual (don't we need to skip some sleep?)
-    //
-    // effectively checkpoint will be one interval in the past,
-    // so next run will
-    clock.setTime(clock.getTime() + (checkpointCutOffMultiplier *
-      sleepIntervalMillis));
-    sleep = agg.runOnce(sleepIntervalMillis);
-    assertEquals("startTime after 2xinterval", clock.getTime() -
-        (checkpointCutOffMultiplier * sleepIntervalMillis),
-      startTimeInDoWork.get());
-    assertEquals("endTime after 2xinterval", clock.getTime() -
-        sleepIntervalMillis,
-      endTimeInDoWork.get());
-    assertEquals("checkpoint after 2xinterval", clock.getTime() -
-      sleepIntervalMillis, checkPoint.get());
-    assertEquals(sleep, sleepIntervalMillis);
-    assertEquals(3, actualRuns);
-
-    // exactly one sleepInterval after one that lagged by one whole interval,
-    // so it will do the previous one... and sleep as usual
-    // no way to keep up
-    clock.setTime(clock.getTime() + sleepIntervalMillis);
-    sleep = agg.runOnce(sleepIntervalMillis);
-    assertEquals("startTime ", clock.getTime() -
-        (checkpointCutOffMultiplier * sleepIntervalMillis),
-      startTimeInDoWork.get());
-    assertEquals("endTime  ", clock.getTime() -
-        sleepIntervalMillis,
-      endTimeInDoWork.get());
-    assertEquals("checkpoint ", clock.getTime() - sleepIntervalMillis,
-      checkPoint.get());
-    assertEquals(sleep, sleepIntervalMillis);
-    assertEquals(4, actualRuns);
-
-
-    // checkpointCutOffMultiplier x sleepInterval - in normal state should 
pass,
-    // but the clock lags too much, so this will not execute aggregation
-    // just update checkpoint to currentTime
-    clock.setTime(clock.getTime() + (checkpointCutOffMultiplier *
-      sleepIntervalMillis));
-    sleep = agg.runOnce(sleepIntervalMillis);
-    assertEquals(4, actualRuns);
-    assertEquals("checkpoint after too much lag is reset to " +
-        "current clock time",
-      clock.getTime(), checkPoint.get());
-    assertEquals(sleep, sleepIntervalMillis);
-  }
-
-  @Test
-  public void testDoWorkOnInterruptedRuns() throws Exception {
-    // start at some non-zero arbitrarily selected time;
-    int startingTime = 10000;
-
-    // 1.
-    clock.setTime(startingTime);
-    long timeOfFirstStep = clock.getTime();
-    long sleep = agg.runOnce(sleepIntervalMillis);
-    assertEquals("startTime should be zero", 0, startTimeInDoWork.get());
-    assertEquals("endTime  should be zero", 0, endTimeInDoWork.get());
-    assertEquals("do not aggregate on first run", 0, actualRuns);
-    assertEquals("first checkpoint set on current time", timeOfFirstStep,
-      checkPoint.get());
-    assertEquals(sleep, sleepIntervalMillis);
-
-    // 2.
-    // the doWork was fast, and sleep was interrupted (e.g. restart)
-    // Q: do we want to aggregate just part of the system? maybe we should
-    // sleep up to next cycle start!!
-    clock.setTime(timeOfFirstStep + 1);
-    long timeOfSecondStep = clock.getTime();
-    sleep = agg.runOnce(sleepIntervalMillis);
-    assertEquals("startTime should be on previous checkpoint since it did not" 
+
-        " run yet",
-      timeOfFirstStep, startTimeInDoWork.get());
-
-    assertEquals("endTime can be start + interval",
-      startingTime + sleepIntervalMillis,
-      endTimeInDoWork.get());
-    assertEquals("should aggregate", 1, actualRuns);
-    assertEquals("checkpoint here should be set to min(endTime,currentTime), " 
+
-        "it is currentTime in our scenario",
-      timeOfSecondStep, checkPoint.get());
-
-    assertEquals(sleep, sleepIntervalMillis);
-
-    //3.
-    // and again not a full sleep passed, so only small part was aggregated
-    clock.setTime(startingTime + 2);
-    long timeOfThirdStep = clock.getTime();
-
-    sleep = agg.runOnce(sleepIntervalMillis);
-    // startTime and endTime are both be in the future, makes no sens,
-    // query will not work!!
-    assertEquals("startTime should be previous checkpoint",
-      timeOfSecondStep, startTimeInDoWork.get());
-
-    assertEquals("endTime  can be start + interval",
-      timeOfSecondStep + sleepIntervalMillis,
-      endTimeInDoWork.get());
-    assertEquals("should aggregate", 2, actualRuns);
-    assertEquals("checkpoint here should be set to min(endTime,currentTime), " 
+
-        "it is currentTime in our scenario",
-      timeOfThirdStep,
-      checkPoint.get());
-    assertEquals(sleep, sleepIntervalMillis);
-
-  }
-
-  private static class TestClock implements Clock {
-
-    private long time;
-
-    public void setTime(long time) {
-      this.time = time;
-    }
-
-    @Override
-    public long getTime() {
-      return time;
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/2c7aec60/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
deleted file mode 100644
index 4ddecdc..0000000
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
+++ /dev/null
@@ -1,702 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
-
-
-import junit.framework.Assert;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricReadHelper;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TreeMap;
-
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertNotNull;
-import static junit.framework.Assert.assertTrue;
-import static junit.framework.Assert.fail;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineClusterMetric;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.prepareSingleTimelineMetric;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_APP_IDS;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
-
-public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
-  private Connection conn;
-  private PhoenixHBaseAccessor hdb;
-  private final TimelineMetricReadHelper metricReader = new 
TimelineMetricReadHelper(false);
-
-  @Before
-  public void setUp() throws Exception {
-    
Logger.getLogger("org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline").setLevel(Level.DEBUG);
-    hdb = createTestableHBaseAccessor();
-    // inits connection, starts mini cluster
-    conn = getConnection(getUrl());
-
-    hdb.initMetricSchema();
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    Connection conn = getConnection(getUrl());
-    Statement stmt = conn.createStatement();
-
-    stmt.execute("delete from METRIC_AGGREGATE");
-    stmt.execute("delete from METRIC_AGGREGATE_HOURLY");
-    stmt.execute("delete from METRIC_RECORD");
-    stmt.execute("delete from METRIC_RECORD_HOURLY");
-    stmt.execute("delete from METRIC_RECORD_MINUTE");
-    conn.commit();
-
-    stmt.close();
-    conn.close();
-  }
-
-  private Configuration getConfigurationForTest(boolean useGroupByAggregators) 
{
-    Configuration configuration = new Configuration();
-    configuration.set("timeline.metrics.service.use.groupBy.aggregators", 
String.valueOf(useGroupByAggregators));
-    return configuration;
-  }
-
-  @Test
-  public void testShouldAggregateClusterProperly() throws Exception {
-    // GIVEN
-    TimelineMetricAggregator agg =
-      
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, 
getConfigurationForTest(false));
-    TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
-
-    long startTime = System.currentTimeMillis();
-    long ctime = startTime;
-    long minute = 60 * 1000;
-    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
-      "disk_free", 1));
-    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
-      "disk_free", 2));
-    ctime += 2*minute;
-    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
-      "disk_free", 2));
-    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
-      "disk_free", 1));
-
-    // WHEN
-    long endTime = ctime + minute;
-    boolean success = agg.doWork(startTime, endTime);
-
-    //THEN
-    Condition condition = new DefaultCondition(null, null, null, null, 
startTime,
-      endTime, null, null, true);
-    condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
-      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, 
NATIVE_TIME_RANGE_DELTA),
-      METRICS_CLUSTER_AGGREGATE_TABLE_NAME));
-
-    PreparedStatement pstmt = 
PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
-    ResultSet rs = pstmt.executeQuery();
-
-    int recordCount = 0;
-    while (rs.next()) {
-      TimelineClusterMetric currentMetric = metricReader.fromResultSet(rs);
-      MetricClusterAggregate currentHostAggregate =
-        readHelper.getMetricClusterAggregateFromResultSet(rs);
-
-      if ("disk_free".equals(currentMetric.getMetricName())) {
-        assertEquals(2, currentHostAggregate.getNumberOfHosts());
-        assertEquals(2.0, currentHostAggregate.getMax());
-        assertEquals(1.0, currentHostAggregate.getMin());
-        assertEquals(3.0, currentHostAggregate.getSum());
-        recordCount++;
-      } else {
-        fail("Unexpected entry");
-      }
-    }
-  }
-
-  @Test
-  public void testShouldAggregateClusterIgnoringInstance() throws Exception {
-    // GIVEN
-    TimelineMetricAggregator agg =
-      
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, 
getConfigurationForTest(false));
-    TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
-
-    long startTime = System.currentTimeMillis();
-    long ctime = startTime;
-    long minute = 60 * 1000 * 2;
-
-    /**
-     * Here we have two nodes with two instances each:
-     *              | local1 | local2 |
-     *  instance i1 |   1    |   2    |
-     *  instance i2 |   3    |   4    |
-     *
-     */
-    // Four 1's at ctime - 100
-    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime - 100, "local1",
-      "i1", "disk_free", 1));
-    // Four 2's at ctime - 100: different host
-    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime - 100, "local2",
-      "i1", "disk_free", 2));
-    // Avoid overwrite
-    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime + 100, "local1",
-      "i2", "disk_free", 3));
-    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime + 100, "local2",
-      "i2", "disk_free", 4));
-
-    ctime += minute;
-
-    // Four 1's at ctime + 2 min
-    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime - 100, "local1",
-      "i1", "disk_free", 1));
-    // Four 1's at ctime + 2 min - different host
-    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime - 100, "local2",
-      "i1", "disk_free", 3));
-    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime + 100, "local1",
-      "i2", "disk_free", 2));
-    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime + 100, "local2",
-      "i2", "disk_free", 4));
-    // WHEN
-    long endTime = ctime + minute;
-    boolean success = agg.doWork(startTime - 1000, endTime + 1000);
-
-    //THEN
-    Condition condition = new DefaultCondition(null, null, null, null, 
startTime,
-      endTime, null, null, true);
-    condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
-      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, 
NATIVE_TIME_RANGE_DELTA),
-      METRICS_CLUSTER_AGGREGATE_TABLE_NAME));
-
-    PreparedStatement pstmt = 
PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
-    ResultSet rs = pstmt.executeQuery();
-
-    int recordCount = 0;
-    while (rs.next()) {
-      TimelineClusterMetric currentMetric = metricReader.fromResultSet(rs);
-      MetricClusterAggregate currentHostAggregate =
-        readHelper.getMetricClusterAggregateFromResultSet(rs);
-
-      if ("disk_free".equals(currentMetric.getMetricName())) {
-        System.out.println("OUTPUT: " + currentMetric + " - " + 
currentHostAggregate);
-        assertEquals(2, currentHostAggregate.getNumberOfHosts());
-        assertEquals(5.0, Math.floor(currentHostAggregate.getSum()));
-        recordCount++;
-      } else {
-        fail("Unexpected entry");
-      }
-    }
-
-    Assert.assertEquals(5, recordCount);
-  }
-
-  @Test
-  public void testShouldAggregateDifferentMetricsOnClusterProperly() throws 
Exception {
-    // GIVEN
-    TimelineMetricAggregator agg =
-      
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, 
getConfigurationForTest(false));
-    TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
-
-    // here we put some metrics tha will be aggregated
-    long startTime = System.currentTimeMillis();
-    long ctime = startTime;
-    long minute = 60 * 1000;
-    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
-      "disk_free", 1));
-    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
-      "disk_free", 2));
-    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
-      "disk_used", 1));
-
-    ctime += 2*minute;
-    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
-      "disk_free", 2));
-    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
-      "disk_free", 1));
-    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
-      "disk_used", 1));
-
-    // WHEN
-    long endTime = ctime + minute;
-    boolean success = agg.doWork(startTime, endTime);
-
-    //THEN
-    Condition condition = new DefaultCondition(null, null, null, null, 
startTime,
-      endTime, null, null, true);
-    condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
-      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, 
NATIVE_TIME_RANGE_DELTA),
-      METRICS_CLUSTER_AGGREGATE_TABLE_NAME));
-
-    PreparedStatement pstmt = 
PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
-    ResultSet rs = pstmt.executeQuery();
-
-    int recordCount = 0;
-    while (rs.next()) {
-      TimelineClusterMetric currentMetric = metricReader.fromResultSet(rs);
-      MetricClusterAggregate currentHostAggregate =
-        readHelper.getMetricClusterAggregateFromResultSet(rs);
-
-      if ("disk_free".equals(currentMetric.getMetricName())) {
-        assertEquals(2, currentHostAggregate.getNumberOfHosts());
-        assertEquals(2.0, currentHostAggregate.getMax());
-        assertEquals(1.0, currentHostAggregate.getMin());
-        assertEquals(3.0, currentHostAggregate.getSum());
-        recordCount++;
-      } else if ("disk_used".equals(currentMetric.getMetricName())) {
-        assertEquals(1, currentHostAggregate.getNumberOfHosts());
-        assertEquals(1.0, currentHostAggregate.getMax());
-        assertEquals(1.0, currentHostAggregate.getMin());
-        assertEquals(1.0, currentHostAggregate.getSum());
-        recordCount++;
-      } else {
-        fail("Unexpected entry");
-      }
-    }
-  }
-
-  @Test
-  public void testAggregateDailyClusterMetrics() throws Exception {
-    // GIVEN
-    TimelineMetricAggregator agg =
-      
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(hdb, 
getConfigurationForTest(false));
-
-    // this time can be virtualized! or made independent from real clock
-    long startTime = System.currentTimeMillis();
-    long ctime = startTime;
-    long hour = 3600 * 1000;
-
-    Map<TimelineClusterMetric, MetricHostAggregate> records =
-      new HashMap<TimelineClusterMetric, MetricHostAggregate>();
-
-    records.put(createEmptyTimelineClusterMetric(ctime),
-      MetricTestHelper.createMetricHostAggregate(4.0, 0.0, 2, 4.0));
-    records.put(createEmptyTimelineClusterMetric(ctime += hour),
-      MetricTestHelper.createMetricHostAggregate(4.0, 0.0, 2, 4.0));
-    records.put(createEmptyTimelineClusterMetric(ctime += hour),
-      MetricTestHelper.createMetricHostAggregate(4.0, 0.0, 2, 4.0));
-    records.put(createEmptyTimelineClusterMetric(ctime += hour),
-      MetricTestHelper.createMetricHostAggregate(4.0, 0.0, 2, 4.0));
-
-
-    hdb.saveClusterTimeAggregateRecords(records, 
METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME);
-
-    // WHEN
-    agg.doWork(startTime, ctime + hour + 1000);
-
-    // THEN
-    ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_DAILY");
-    int count = 0;
-    while (rs.next()) {
-      assertEquals("METRIC_NAME", "disk_used", rs.getString("METRIC_NAME"));
-      assertEquals("APP_ID", "test_app", rs.getString("APP_ID"));
-      assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM"));
-      assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
-      assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX"));
-      assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN"));
-      count++;
-    }
-
-    assertEquals("Day aggregated row expected ", 1, count);
-  }
-
-  @Test
-  public void testShouldAggregateClusterOnMinuteProperly() throws Exception {
-
-    TimelineMetricAggregator agg =
-      
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, 
getConfigurationForTest(false));
-
-    long startTime = System.currentTimeMillis();
-    long ctime = startTime;
-    long second = 1000;
-    long minute = 60*second;
-
-    Map<TimelineClusterMetric, MetricClusterAggregate> records =
-      new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
-
-    records.put(createEmptyTimelineClusterMetric(ctime),
-      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
-    records.put(createEmptyTimelineClusterMetric(ctime += second),
-      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
-    records.put(createEmptyTimelineClusterMetric(ctime += second),
-      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
-    records.put(createEmptyTimelineClusterMetric(ctime += second),
-      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
-
-    hdb.saveClusterAggregateRecords(records);
-    agg.doWork(startTime, ctime + second);
-    long oldCtime = ctime + second;
-
-    //Next minute
-    ctime = startTime + minute;
-
-    records.put(createEmptyTimelineClusterMetric(ctime),
-      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
-    records.put(createEmptyTimelineClusterMetric(ctime += second),
-      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
-    records.put(createEmptyTimelineClusterMetric(ctime += second),
-      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
-    records.put(createEmptyTimelineClusterMetric(ctime += second),
-      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
-
-    hdb.saveClusterAggregateRecords(records);
-    agg.doWork(oldCtime, ctime + second);
-
-    ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_MINUTE");
-    int count = 0;
-    long diff = 0 ;
-    while (rs.next()) {
-      assertEquals("METRIC_NAME", "disk_used", rs.getString("METRIC_NAME"));
-      assertEquals("APP_ID", "test_app", rs.getString("APP_ID"));
-      assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM"));
-      assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
-      assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX"));
-      assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN"));
-      if (count == 0) {
-        diff+=rs.getLong("SERVER_TIME");
-      } else {
-        diff-=rs.getLong("SERVER_TIME");
-        if (diff < 0) {
-          diff*=-1;
-        }
-        assertTrue(diff == minute);
-      }
-      count++;
-    }
-
-    assertEquals("One hourly aggregated row expected ", 2, count);
-  }
-
-  @Test
-  public void testShouldAggregateClusterOnHourProperly() throws Exception {
-    // GIVEN
-    TimelineMetricAggregator agg =
-      
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, 
getConfigurationForTest(false));
-
-    // this time can be virtualized! or made independent from real clock
-    long startTime = System.currentTimeMillis();
-    long ctime = startTime;
-    long minute = 60 * 1000;
-
-    Map<TimelineClusterMetric, MetricClusterAggregate> records =
-      new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
-
-    records.put(createEmptyTimelineClusterMetric(ctime),
-      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
-    records.put(createEmptyTimelineClusterMetric(ctime += minute),
-      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
-    records.put(createEmptyTimelineClusterMetric(ctime += minute),
-      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
-    records.put(createEmptyTimelineClusterMetric(ctime += minute),
-      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
-
-    hdb.saveClusterAggregateRecords(records);
-
-    // WHEN
-    agg.doWork(startTime, ctime + minute);
-
-    // THEN
-    ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_HOURLY");
-    int count = 0;
-    while (rs.next()) {
-      assertEquals("METRIC_NAME", "disk_used", rs.getString("METRIC_NAME"));
-      assertEquals("APP_ID", "test_app", rs.getString("APP_ID"));
-      assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM"));
-      assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
-      assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX"));
-      assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN"));
-      count++;
-    }
-
-    assertEquals("One hourly aggregated row expected ", 1, count);
-  }
-
-  @Test
-  public void testShouldAggregateDifferentMetricsOnHourProperly() throws 
Exception {
-    // GIVEN
-    TimelineMetricAggregator agg =
-      
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, 
getConfigurationForTest(false));
-
-    long startTime = System.currentTimeMillis();
-    long ctime = startTime;
-    long minute = 60 * 1000;
-
-    Map<TimelineClusterMetric, MetricClusterAggregate> records =
-      new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
-
-    records.put(createEmptyTimelineClusterMetric("disk_used", ctime),
-      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
-    records.put(createEmptyTimelineClusterMetric("disk_free", ctime),
-      new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
-
-    records.put(createEmptyTimelineClusterMetric("disk_used", ctime += minute),
-      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
-    records.put(createEmptyTimelineClusterMetric("disk_free", ctime),
-      new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
-
-    records.put(createEmptyTimelineClusterMetric("disk_used", ctime += minute),
-      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
-    records.put(createEmptyTimelineClusterMetric("disk_free", ctime),
-      new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
-
-    records.put(createEmptyTimelineClusterMetric("disk_used", ctime += minute),
-      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
-    records.put(createEmptyTimelineClusterMetric("disk_free", ctime),
-      new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
-
-    hdb.saveClusterAggregateRecords(records);
-
-    // WHEN
-    agg.doWork(startTime, ctime + minute);
-
-    // THEN
-    ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_HOURLY");
-    int count = 0;
-    while (rs.next()) {
-      if ("disk_used".equals(rs.getString("METRIC_NAME"))) {
-        assertEquals("APP_ID", "test_app", rs.getString("APP_ID"));
-        assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM"));
-        assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
-        assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX"));
-        assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN"));
-      } else if ("disk_free".equals(rs.getString("METRIC_NAME"))) {
-        assertEquals("APP_ID", "test_app", rs.getString("APP_ID"));
-        assertEquals("METRIC_SUM", 4.0, rs.getDouble("METRIC_SUM"));
-        assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
-        assertEquals("METRIC_MAX", 1.0, rs.getDouble("METRIC_MAX"));
-        assertEquals("METRIC_MIN", 1.0, rs.getDouble("METRIC_MIN"));
-      }
-
-      count++;
-    }
-
-    assertEquals("Two hourly aggregated row expected ", 2, count);
-  }
-
-  @Test
-  public void testAppLevelHostMetricAggregates() throws Exception {
-    Configuration conf = getConfigurationForTest(false);
-    conf.set(CLUSTER_AGGREGATOR_APP_IDS, "app1");
-    TimelineMetricAggregator agg =
-      
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, 
conf);
-    TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
-
-    long startTime = System.currentTimeMillis();
-    long ctime = startTime;
-    long minute = 60 * 1000;
-    hdb.insertMetricRecords(prepareSingleTimelineMetric((ctime), "local1",
-      "app1", null, "app_metric_random", 1));
-    ctime += 10;
-    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
-      "cpu_user", 1));
-    ctime += 10;
-    hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
-      "cpu_user", 2));
-
-    // WHEN
-    long endTime = ctime + minute;
-    boolean success = agg.doWork(startTime, endTime);
-
-    //THEN
-    Condition condition = new DefaultCondition(
-      Collections.singletonList("cpu_user"), null, "app1", null,
-      startTime, endTime, null, null, true);
-    condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
-      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, 
NATIVE_TIME_RANGE_DELTA),
-      METRICS_CLUSTER_AGGREGATE_TABLE_NAME));
-
-    PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt
-      (conn, condition);
-    ResultSet rs = pstmt.executeQuery();
-
-    int recordCount = 0;
-    TimelineClusterMetric currentMetric = null;
-    MetricClusterAggregate currentHostAggregate = null;
-    while (rs.next()) {
-      currentMetric = metricReader.fromResultSet(rs);
-      currentHostAggregate = 
readHelper.getMetricClusterAggregateFromResultSet(rs);
-      recordCount++;
-    }
-    assertEquals(3, recordCount);
-    assertNotNull(currentMetric);
-    assertEquals("cpu_user", currentMetric.getMetricName());
-    assertEquals("app1", currentMetric.getAppId());
-    assertNotNull(currentHostAggregate);
-    assertEquals(1, currentHostAggregate.getNumberOfHosts());
-    assertEquals(1.0d, currentHostAggregate.getSum());
-  }
-
-  @Test
-  public void testClusterAggregateMetricNormalization() throws Exception {
-    TimelineMetricAggregator agg =
-      
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, 
getConfigurationForTest(false));
-    TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false);
-
-    // Sample data
-    TimelineMetric metric1 = new TimelineMetric();
-    metric1.setMetricName("yarn.ClusterMetrics.NumActiveNMs");
-    metric1.setAppId("resourcemanager");
-    metric1.setHostName("h1");
-    metric1.setStartTime(1431372311811l);
-    metric1.setMetricValues(new TreeMap<Long, Double>() {{
-      put(1431372311811l, 1.0);
-      put(1431372321811l, 1.0);
-      put(1431372331811l, 1.0);
-      put(1431372341811l, 1.0);
-      put(1431372351811l, 1.0);
-      put(1431372361811l, 1.0);
-      put(1431372371810l, 1.0);
-    }});
-
-    TimelineMetric metric2 = new TimelineMetric();
-    metric2.setMetricName("yarn.ClusterMetrics.NumActiveNMs");
-    metric2.setAppId("resourcemanager");
-    metric2.setHostName("h1");
-    metric2.setStartTime(1431372381810l);
-    metric2.setMetricValues(new TreeMap<Long, Double>() {{
-      put(1431372381810l, 1.0);
-      put(1431372391811l, 1.0);
-      put(1431372401811l, 1.0);
-      put(1431372411811l, 1.0);
-      put(1431372421811l, 1.0);
-      put(1431372431811l, 1.0);
-      put(1431372441810l, 1.0);
-    }});
-
-    TimelineMetrics metrics = new TimelineMetrics();
-    metrics.setMetrics(Collections.singletonList(metric1));
-    insertMetricRecords(conn, metrics, 1431372371810l);
-
-    metrics.setMetrics(Collections.singletonList(metric2));
-    insertMetricRecords(conn, metrics, 1431372441810l);
-
-    long startTime = 1431372055000l;
-    long endTime = 1431372655000l;
-
-    agg.doWork(startTime, endTime);
-
-    Condition condition = new DefaultCondition(null, null, null, null, 
startTime,
-      endTime, null, null, true);
-    condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
-      PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, 
NATIVE_TIME_RANGE_DELTA),
-      METRICS_CLUSTER_AGGREGATE_TABLE_NAME));
-
-    PreparedStatement pstmt = 
PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
-    ResultSet rs = pstmt.executeQuery();
-
-    int recordCount = 0;
-    while (rs.next()) {
-      TimelineClusterMetric currentMetric = metricReader.fromResultSet(rs);
-      MetricClusterAggregate currentHostAggregate = 
readHelper.getMetricClusterAggregateFromResultSet(rs);
-
-      if 
("yarn.ClusterMetrics.NumActiveNMs".equals(currentMetric.getMetricName())) {
-        assertEquals(1, currentHostAggregate.getNumberOfHosts());
-        assertEquals(1.0, currentHostAggregate.getMax());
-        assertEquals(1.0, currentHostAggregate.getMin());
-        assertEquals(1.0, currentHostAggregate.getSum());
-        recordCount++;
-      } else {
-        fail("Unexpected entry");
-      }
-    }
-    Assert.assertEquals(5, recordCount);
-  }
-
-  @Test
-  public void testAggregationUsingGroupByQuery() throws Exception {
-    // GIVEN
-    TimelineMetricAggregator agg =
-      
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, 
getConfigurationForTest(true));
-
-    long startTime = System.currentTimeMillis();
-    long ctime = startTime;
-    long minute = 60 * 1000;
-
-    Map<TimelineClusterMetric, MetricClusterAggregate> records =
-      new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
-
-    records.put(createEmptyTimelineClusterMetric("disk_used", ctime += minute),
-      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
-    records.put(createEmptyTimelineClusterMetric("disk_free", ctime),
-      new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
-
-    records.put(createEmptyTimelineClusterMetric("disk_used", ctime += minute),
-      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
-    records.put(createEmptyTimelineClusterMetric("disk_free", ctime),
-      new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
-
-    records.put(createEmptyTimelineClusterMetric("disk_used", ctime += minute),
-      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
-    records.put(createEmptyTimelineClusterMetric("disk_free", ctime),
-      new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
-
-    records.put(createEmptyTimelineClusterMetric("disk_used", ctime += minute),
-      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
-    records.put(createEmptyTimelineClusterMetric("disk_free", ctime),
-      new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
-
-    hdb.saveClusterAggregateRecords(records);
-
-    // WHEN
-    agg.doWork(startTime, ctime + minute);
-
-    // THEN
-    ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_HOURLY");
-    int count = 0;
-    while (rs.next()) {
-      if ("disk_used".equals(rs.getString("METRIC_NAME"))) {
-        assertEquals("APP_ID", "test_app", rs.getString("APP_ID"));
-        assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM"));
-        assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
-        assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX"));
-        assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN"));
-      } else if ("disk_free".equals(rs.getString("METRIC_NAME"))) {
-        assertEquals("APP_ID", "test_app", rs.getString("APP_ID"));
-        assertEquals("METRIC_SUM", 4.0, rs.getDouble("METRIC_SUM"));
-        assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
-        assertEquals("METRIC_MAX", 1.0, rs.getDouble("METRIC_MAX"));
-        assertEquals("METRIC_MIN", 1.0, rs.getDouble("METRIC_MIN"));
-      }
-      count++;
-    }
-    assertEquals("Two hourly aggregated row expected ", 2, count);
-  }
-
-  private ResultSet executeQuery(String query) throws SQLException {
-    Connection conn = getConnection(getUrl());
-    Statement stmt = conn.createStatement();
-    return stmt.executeQuery(query);
-  }
-}

Reply via email to