AMBARI-14578. Refactor Collector logging for AMS. (swagle)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/2c7aec60 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/2c7aec60 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/2c7aec60 Branch: refs/heads/branch-2.2 Commit: 2c7aec60899392f12803a7f846774e39879e1fb7 Parents: c6af019 Author: Siddharth Wagle <swa...@hortonworks.com> Authored: Thu Jan 7 18:04:02 2016 -0800 Committer: Siddharth Wagle <swa...@hortonworks.com> Committed: Thu Jan 7 18:04:02 2016 -0800 ---------------------------------------------------------------------- .../timeline/HBaseTimelineMetricStore.java | 2 + .../timeline/TimelineMetricStoreWatcher.java | 8 +- .../aggregators/AbstractTimelineAggregator.java | 34 +- .../TimelineMetricAggregatorFactory.java | 22 +- .../TimelineMetricClusterAggregator.java | 6 +- .../TimelineMetricClusterAggregatorSecond.java | 10 +- .../TimelineMetricHostAggregator.java | 9 +- .../v2/TimelineMetricClusterAggregator.java | 23 +- .../v2/TimelineMetricHostAggregator.java | 15 +- .../metrics/timeline/query/EmptyCondition.java | 8 + .../timeline/AbstractMiniHBaseClusterTest.java | 40 ++ .../AbstractTimelineAggregatorTest.java | 277 -------- .../metrics/timeline/ITClusterAggregator.java | 702 ------------------- .../metrics/timeline/ITMetricAggregator.java | 398 ----------- .../metrics/timeline/TestClusterSuite.java | 2 + .../AbstractTimelineAggregatorTest.java | 276 ++++++++ .../aggregators/ITClusterAggregator.java | 677 ++++++++++++++++++ .../aggregators/ITMetricAggregator.java | 373 ++++++++++ 18 files changed, 1447 insertions(+), 1435 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/2c7aec60/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java index 1d654fd..c4e946a 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java @@ -148,6 +148,8 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin executorService.scheduleWithFixedDelay( new TimelineMetricStoreWatcher(this, configuration), initDelay, delay, TimeUnit.SECONDS); + LOG.info("Started watchdog for timeline metrics store with initial " + + "delay = " + initDelay + ", delay = " + delay); isInitialized = true; } http://git-wip-us.apache.org/repos/asf/ambari/blob/2c7aec60/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStoreWatcher.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStoreWatcher.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStoreWatcher.java index 363b43a..632df3f 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStoreWatcher.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStoreWatcher.java @@ -37,8 +37,7 @@ import java.util.concurrent.TimeUnit; */ public class TimelineMetricStoreWatcher implements Runnable { - private static final Log LOG = LogFactory - .getLog(TimelineMetricStoreWatcher.class); + private static final Log LOG = LogFactory.getLog(TimelineMetricStoreWatcher.class); private static final String FAKE_METRIC_NAME = "TimelineMetricStoreWatcher.FakeMetric"; private static final String FAKE_HOSTNAME = "fakehostname"; private static final String FAKE_APP_ID = "timeline_metric_store_watcher"; @@ -60,16 +59,13 @@ public class TimelineMetricStoreWatcher implements Runnable { @Override public void run() { - if (checkMetricStore()) { failures = 0; if (LOG.isDebugEnabled()) { LOG.debug("Successfully got metrics from TimelineMetricStore"); } } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Failed to get metrics from TimelineMetricStore"); - } + LOG.info("Failed to get metrics from TimelineMetricStore, attempt = " + failures); failures++; } http://git-wip-us.apache.org/repos/asf/ambari/blob/2c7aec60/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java index 8bdddf2..fce5a39 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java @@ -18,14 +18,14 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; import org.apache.commons.io.FileUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; +import org.slf4j.LoggerFactory; +import org.slf4j.Logger; import java.io.File; import java.io.IOException; import java.sql.Connection; @@ -43,12 +43,11 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti */ public abstract class AbstractTimelineAggregator implements TimelineMetricAggregator { protected final PhoenixHBaseAccessor hBaseAccessor; - private final Log LOG; + protected final Logger LOG; private Clock clock; protected final long checkpointDelayMillis; protected final Integer resultsetFetchSize; protected Configuration metricsConf; - private String checkpointLocation; private Long sleepIntervalMillis; private Integer checkpointCutOffMultiplier; @@ -56,24 +55,29 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg protected String tableName; protected String outputTableName; protected Long nativeTimeRangeDelay; + // Explicitly name aggregators for logging needs + private final String aggregatorName; - public AbstractTimelineAggregator(PhoenixHBaseAccessor hBaseAccessor, - Configuration metricsConf, Clock clk) { + AbstractTimelineAggregator(String aggregatorName, + PhoenixHBaseAccessor hBaseAccessor, + Configuration metricsConf, Clock clk) { + this.aggregatorName = aggregatorName; this.hBaseAccessor = hBaseAccessor; this.metricsConf = metricsConf; - this.checkpointDelayMillis = SECONDS.toMillis( - metricsConf.getInt(AGGREGATOR_CHECKPOINT_DELAY, 120)); + this.checkpointDelayMillis = SECONDS.toMillis(metricsConf.getInt(AGGREGATOR_CHECKPOINT_DELAY, 120)); this.resultsetFetchSize = metricsConf.getInt(RESULTSET_FETCH_SIZE, 2000); - this.LOG = LogFactory.getLog(this.getClass()); + this.LOG = LoggerFactory.getLogger(aggregatorName); this.clock = clk; } - public AbstractTimelineAggregator(PhoenixHBaseAccessor hBaseAccessor, - Configuration metricsConf) { - this(hBaseAccessor, metricsConf, new SystemClock()); + AbstractTimelineAggregator(String aggregatorName, + PhoenixHBaseAccessor hBaseAccessor, + Configuration metricsConf) { + this(aggregatorName, hBaseAccessor, metricsConf, new SystemClock()); } - public AbstractTimelineAggregator(PhoenixHBaseAccessor hBaseAccessor, + public AbstractTimelineAggregator(String aggregatorName, + PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, String checkpointLocation, Long sleepIntervalMillis, @@ -82,7 +86,7 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg String tableName, String outputTableName, Long nativeTimeRangeDelay) { - this(hBaseAccessor, metricsConf); + this(aggregatorName, hBaseAccessor, metricsConf); this.checkpointLocation = checkpointLocation; this.sleepIntervalMillis = sleepIntervalMillis; this.checkpointCutOffMultiplier = checkpointCutOffMultiplier; @@ -199,7 +203,7 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg } } } catch (IOException io) { - LOG.debug(io); + LOG.debug("", io); } return -1; } http://git-wip-us.apache.org/repos/asf/ambari/blob/2c7aec60/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java index ba019fa..f0b2fda 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java @@ -103,6 +103,7 @@ public class TimelineMetricAggregatorFactory { if (useGroupByAggregator(metricsConf)) { return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricHostAggregator( + "TimelineMetricHostAggregatorMinute", hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, @@ -114,7 +115,9 @@ public class TimelineMetricAggregatorFactory { ); } - return new TimelineMetricHostAggregator(hBaseAccessor, metricsConf, + return new TimelineMetricHostAggregator( + "TimelineMetricHostAggregatorMinute", + hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, checkpointCutOffMultiplier, @@ -147,6 +150,7 @@ public class TimelineMetricAggregatorFactory { if (useGroupByAggregator(metricsConf)) { return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricHostAggregator( + "TimelineMetricHostAggregatorHourly", hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, @@ -158,7 +162,9 @@ public class TimelineMetricAggregatorFactory { ); } - return new TimelineMetricHostAggregator(hBaseAccessor, metricsConf, + return new TimelineMetricHostAggregator( + "TimelineMetricHostAggregatorHourly", + hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, checkpointCutOffMultiplier, @@ -191,6 +197,7 @@ public class TimelineMetricAggregatorFactory { if (useGroupByAggregator(metricsConf)) { return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricHostAggregator( + "TimelineMetricHostAggregatorDaily", hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, @@ -202,7 +209,9 @@ public class TimelineMetricAggregatorFactory { ); } - return new TimelineMetricHostAggregator(hBaseAccessor, metricsConf, + return new TimelineMetricHostAggregator( + "TimelineMetricHostAggregatorDaily", + hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, checkpointCutOffMultiplier, @@ -241,6 +250,7 @@ public class TimelineMetricAggregatorFactory { // Second based aggregation have added responsibility of time slicing return new TimelineMetricClusterAggregatorSecond( + "TimelineClusterAggregatorSecond", hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, @@ -278,6 +288,7 @@ public class TimelineMetricAggregatorFactory { if (useGroupByAggregator(metricsConf)) { return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricClusterAggregator( + "TimelineClusterAggregatorMinute", hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, @@ -290,6 +301,7 @@ public class TimelineMetricAggregatorFactory { } return new TimelineMetricClusterAggregator( + "TimelineClusterAggregatorMinute", hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, @@ -326,6 +338,7 @@ public class TimelineMetricAggregatorFactory { if (useGroupByAggregator(metricsConf)) { return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricClusterAggregator( + "TimelineClusterAggregatorHourly", hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, @@ -338,6 +351,7 @@ public class TimelineMetricAggregatorFactory { } return new TimelineMetricClusterAggregator( + "TimelineClusterAggregatorHourly", hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, @@ -374,6 +388,7 @@ public class TimelineMetricAggregatorFactory { if (useGroupByAggregator(metricsConf)) { return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricClusterAggregator( + "TimelineClusterAggregatorDaily", hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, @@ -386,6 +401,7 @@ public class TimelineMetricAggregatorFactory { } return new TimelineMetricClusterAggregator( + "TimelineClusterAggregatorDaily", hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, http://git-wip-us.apache.org/repos/asf/ambari/blob/2c7aec60/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java index 9ed11e1..1c1c4b6 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java @@ -36,10 +36,10 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator { private final TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(true); - private static final Log LOG = LogFactory.getLog(TimelineMetricClusterAggregator.class); private final boolean isClusterPrecisionInputTable; - public TimelineMetricClusterAggregator(PhoenixHBaseAccessor hBaseAccessor, + public TimelineMetricClusterAggregator(String aggregatorName, + PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, String checkpointLocation, Long sleepIntervalMillis, @@ -48,7 +48,7 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator String inputTableName, String outputTableName, Long nativeTimeRangeDelay) { - super(hBaseAccessor, metricsConf, checkpointLocation, + super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, checkpointCutOffMultiplier, hostAggregatorDisabledParam, inputTableName, outputTableName, nativeTimeRangeDelay); http://git-wip-us.apache.org/repos/asf/ambari/blob/2c7aec60/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java index 1c7bf7f..4c96e5a 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java @@ -43,7 +43,6 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti * the precision table and saves into the aggregate. */ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggregator { - private static final Log LOG = LogFactory.getLog(TimelineMetricClusterAggregatorSecond.class); public Long timeSliceIntervalMillis; private TimelineMetricReadHelper timelineMetricReadHelper = new TimelineMetricReadHelper(true); // Aggregator to perform app-level aggregates for host metrics @@ -51,7 +50,8 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre // 1 minute client side buffering adjustment private final Long serverTimeShiftAdjustment; - public TimelineMetricClusterAggregatorSecond(PhoenixHBaseAccessor hBaseAccessor, + public TimelineMetricClusterAggregatorSecond(String aggregatorName, + PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, String checkpointLocation, Long sleepIntervalMillis, @@ -61,9 +61,9 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre String outputTableName, Long nativeTimeRangeDelay, Long timeSliceInterval) { - super(hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, - checkpointCutOffMultiplier, aggregatorDisabledParam, tableName, - outputTableName, nativeTimeRangeDelay); + super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation, + sleepIntervalMillis, checkpointCutOffMultiplier, aggregatorDisabledParam, + tableName, outputTableName, nativeTimeRangeDelay); appAggregator = new TimelineMetricAppAggregator(metricsConf); this.timeSliceIntervalMillis = timeSliceInterval; http://git-wip-us.apache.org/repos/asf/ambari/blob/2c7aec60/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java index 37ddeb3..e0fa26e 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java @@ -36,7 +36,8 @@ public class TimelineMetricHostAggregator extends AbstractTimelineAggregator { private static final Log LOG = LogFactory.getLog(TimelineMetricHostAggregator.class); TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); - public TimelineMetricHostAggregator(PhoenixHBaseAccessor hBaseAccessor, + public TimelineMetricHostAggregator(String aggregatorName, + PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, String checkpointLocation, Long sleepIntervalMillis, @@ -45,9 +46,9 @@ public class TimelineMetricHostAggregator extends AbstractTimelineAggregator { String tableName, String outputTableName, Long nativeTimeRangeDelay) { - super(hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, - checkpointCutOffMultiplier, hostAggregatorDisabledParam, tableName, - outputTableName, nativeTimeRangeDelay); + super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation, + sleepIntervalMillis, checkpointCutOffMultiplier, hostAggregatorDisabledParam, + tableName, outputTableName, nativeTimeRangeDelay); } @Override http://git-wip-us.apache.org/repos/asf/ambari/blob/2c7aec60/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java index ca95206..5257412 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java @@ -17,38 +17,25 @@ */ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AbstractTimelineAggregator; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricReadHelper; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.EmptyCondition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; - import java.io.IOException; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Date; -import java.util.HashMap; -import java.util.Map; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_AGGREGATED_APP_METRIC_GROUPBY_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_TIME_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA; public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator { - private static final Log LOG = LogFactory.getLog(TimelineMetricClusterAggregator.class); private final String aggregateColumnName; - public TimelineMetricClusterAggregator(PhoenixHBaseAccessor hBaseAccessor, + public TimelineMetricClusterAggregator(String aggregatorName, + PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, String checkpointLocation, Long sleepIntervalMillis, @@ -57,7 +44,7 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator String inputTableName, String outputTableName, Long nativeTimeRangeDelay) { - super(hBaseAccessor, metricsConf, checkpointLocation, + super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, checkpointCutOffMultiplier, hostAggregatorDisabledParam, inputTableName, outputTableName, nativeTimeRangeDelay); @@ -88,6 +75,10 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator outputTableName, aggregateColumnName, tableName, startTime, endTime)); + if (LOG.isDebugEnabled()) { + LOG.debug("Condition: " + condition.toString()); + } + return condition; } http://git-wip-us.apache.org/repos/asf/ambari/blob/2c7aec60/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java index 1e1712f..1c46642 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java @@ -34,9 +34,9 @@ import java.util.Date; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL; public class TimelineMetricHostAggregator extends AbstractTimelineAggregator { - private static final Log LOG = LogFactory.getLog(TimelineMetricHostAggregator.class); - public TimelineMetricHostAggregator(PhoenixHBaseAccessor hBaseAccessor, + public TimelineMetricHostAggregator(String aggregatorName, + PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, String checkpointLocation, Long sleepIntervalMillis, @@ -45,9 +45,9 @@ public class TimelineMetricHostAggregator extends AbstractTimelineAggregator { String tableName, String outputTableName, Long nativeTimeRangeDelay) { - super(hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, - checkpointCutOffMultiplier, hostAggregatorDisabledParam, tableName, - outputTableName, nativeTimeRangeDelay); + super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation, + sleepIntervalMillis, checkpointCutOffMultiplier, hostAggregatorDisabledParam, + tableName, outputTableName, nativeTimeRangeDelay); } @Override @@ -67,7 +67,10 @@ public class TimelineMetricHostAggregator extends AbstractTimelineAggregator { PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, nativeTimeRangeDelay), outputTableName, tableName, startTime, endTime)); + if (LOG.isDebugEnabled()) { + LOG.debug("Condition: " + condition.toString()); + } + return condition; } - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/2c7aec60/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/EmptyCondition.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/EmptyCondition.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/EmptyCondition.java index 30e3d4d..34174e2 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/EmptyCondition.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/EmptyCondition.java @@ -136,4 +136,12 @@ public class EmptyCondition implements Condition { public boolean doUpdate() { return doUpdate; } + + @Override + public String toString() { + return "EmptyCondition{ " + + " statement = " + this.getStatement() + + " doUpdate = " + this.doUpdate() + + " }"; + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/2c7aec60/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java index a4d53b3..e73c741 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java @@ -25,6 +25,8 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.ConnectionProvider; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; import org.apache.phoenix.hbase.index.write.IndexWriterUtils; import org.apache.phoenix.query.BaseTest; import org.apache.phoenix.query.QueryServices; @@ -32,6 +34,7 @@ import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.ReadOnlyProps; import org.junit.After; import org.junit.AfterClass; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -57,6 +60,8 @@ import static org.assertj.core.api.Assertions.assertThat; public abstract class AbstractMiniHBaseClusterTest extends BaseTest { protected static final long BATCH_SIZE = 3; + protected Connection conn; + protected PhoenixHBaseAccessor hdb; @BeforeClass public static void doSetup() throws Exception { @@ -75,6 +80,41 @@ public abstract class AbstractMiniHBaseClusterTest extends BaseTest { dropNonSystemTables(); } + @Before + public void setUp() throws Exception { + Logger.getLogger("org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline").setLevel(Level.DEBUG); + hdb = createTestableHBaseAccessor(); + // inits connection, starts mini cluster + conn = getConnection(getUrl()); + + hdb.initMetricSchema(); + } + + @After + public void tearDown() throws Exception { + Connection conn = null; + Statement stmt = null; + try { + conn = getConnection(getUrl()); + stmt = conn.createStatement(); + + stmt.execute("delete from METRIC_AGGREGATE"); + stmt.execute("delete from METRIC_AGGREGATE_HOURLY"); + stmt.execute("delete from METRIC_RECORD"); + stmt.execute("delete from METRIC_RECORD_HOURLY"); + stmt.execute("delete from METRIC_RECORD_MINUTE"); + conn.commit(); + } finally { + if (stmt != null) { + stmt.close(); + } + + if (conn != null) { + conn.close(); + } + } + } + @After public void cleanUpAfterTest() throws Exception { deletePriorTables(HConstants.LATEST_TIMESTAMP, getUrl()); http://git-wip-us.apache.org/repos/asf/ambari/blob/2c7aec60/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregatorTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregatorTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregatorTest.java deleted file mode 100644 index c22e734..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregatorTest.java +++ /dev/null @@ -1,277 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AbstractTimelineAggregator; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; -import org.apache.hadoop.yarn.util.Clock; -import org.junit.Before; -import org.junit.Test; -import java.io.IOException; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.concurrent.atomic.AtomicLong; -import static junit.framework.Assert.assertEquals; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATOR_CHECKPOINT_DELAY; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.RESULTSET_FETCH_SIZE; - -public class AbstractTimelineAggregatorTest { - - private AbstractTimelineAggregator agg; - TestClock clock = new TestClock(); - - AtomicLong startTimeInDoWork; - AtomicLong endTimeInDoWork; - AtomicLong checkPoint; - int actualRuns; - - long sleepIntervalMillis; - int checkpointCutOffMultiplier; - - @Before - public void setUp() throws Exception { - sleepIntervalMillis = 30000l; - checkpointCutOffMultiplier = 2; - - Configuration metricsConf = new Configuration(); - metricsConf.setInt(AGGREGATOR_CHECKPOINT_DELAY, 0); - metricsConf.setInt(RESULTSET_FETCH_SIZE, 2000); - - startTimeInDoWork = new AtomicLong(0); - endTimeInDoWork = new AtomicLong(0); - checkPoint = new AtomicLong(-1); - actualRuns = 0; - - agg = new AbstractTimelineAggregator( - null, metricsConf, clock) { - @Override - public boolean doWork(long startTime, long endTime) { - startTimeInDoWork.set(startTime); - endTimeInDoWork.set(endTime); - actualRuns++; - - return true; - } - - @Override - protected Condition - prepareMetricQueryCondition(long startTime, long endTime) { - return null; - } - - @Override - protected void aggregate(ResultSet rs, long startTime, - long endTime) throws IOException, SQLException { - } - - @Override - protected Long getSleepIntervalMillis() { - return sleepIntervalMillis; - } - - @Override - protected Integer getCheckpointCutOffMultiplier() { - return checkpointCutOffMultiplier; - } - - @Override - public boolean isDisabled() { - return false; - } - - @Override - protected String getCheckpointLocation() { - return "dummy_ckptFile"; - } - - protected long readCheckPoint() { - return checkPoint.get(); - } - - @Override - protected void saveCheckPoint(long checkpointTime) throws IOException { - checkPoint.set(checkpointTime); - } - }; - - - } - - @Test - public void testDoWorkOnZeroDelay() throws Exception { - - // starting at time 0; - clock.setTime(0); - - long sleep = agg.runOnce(sleepIntervalMillis); - assertEquals("startTime should be zero", 0, startTimeInDoWork.get()); - assertEquals("endTime should be zero", 0, endTimeInDoWork.get()); - assertEquals(0, checkPoint.get()); - assertEquals(sleep, sleepIntervalMillis); - assertEquals("Do not aggregate on first run", 0, actualRuns); - - // exactly one sleepInterval - clock.setTime(clock.getTime() + sleepIntervalMillis); - sleep = agg.runOnce(sleepIntervalMillis); - assertEquals("startTime", clock.getTime() - - sleepIntervalMillis, - startTimeInDoWork.get()); - assertEquals("endTime", clock.getTime(), - endTimeInDoWork.get()); - assertEquals(clock.getTime(), checkPoint.get()); - assertEquals(sleep, sleepIntervalMillis); - assertEquals(1, actualRuns); - - // exactly one sleepInterval - clock.setTime(clock.getTime() + sleepIntervalMillis); - sleep = agg.runOnce(sleepIntervalMillis); - assertEquals("startTime", clock.getTime() - - sleepIntervalMillis, - startTimeInDoWork.get()); - assertEquals("endTime", clock.getTime(), - endTimeInDoWork.get()); - assertEquals(clock.getTime(), checkPoint.get()); - assertEquals(sleep, sleepIntervalMillis); - assertEquals(2, actualRuns); - - // checkpointCutOffMultiplier x sleepInterval - should pass, - // it will aggregate only first part of the whole 2x interval - // and sleep as usual (don't we need to skip some sleep?) - // - // effectively checkpoint will be one interval in the past, - // so next run will - clock.setTime(clock.getTime() + (checkpointCutOffMultiplier * - sleepIntervalMillis)); - sleep = agg.runOnce(sleepIntervalMillis); - assertEquals("startTime after 2xinterval", clock.getTime() - - (checkpointCutOffMultiplier * sleepIntervalMillis), - startTimeInDoWork.get()); - assertEquals("endTime after 2xinterval", clock.getTime() - - sleepIntervalMillis, - endTimeInDoWork.get()); - assertEquals("checkpoint after 2xinterval", clock.getTime() - - sleepIntervalMillis, checkPoint.get()); - assertEquals(sleep, sleepIntervalMillis); - assertEquals(3, actualRuns); - - // exactly one sleepInterval after one that lagged by one whole interval, - // so it will do the previous one... and sleep as usual - // no way to keep up - clock.setTime(clock.getTime() + sleepIntervalMillis); - sleep = agg.runOnce(sleepIntervalMillis); - assertEquals("startTime ", clock.getTime() - - (checkpointCutOffMultiplier * sleepIntervalMillis), - startTimeInDoWork.get()); - assertEquals("endTime ", clock.getTime() - - sleepIntervalMillis, - endTimeInDoWork.get()); - assertEquals("checkpoint ", clock.getTime() - sleepIntervalMillis, - checkPoint.get()); - assertEquals(sleep, sleepIntervalMillis); - assertEquals(4, actualRuns); - - - // checkpointCutOffMultiplier x sleepInterval - in normal state should pass, - // but the clock lags too much, so this will not execute aggregation - // just update checkpoint to currentTime - clock.setTime(clock.getTime() + (checkpointCutOffMultiplier * - sleepIntervalMillis)); - sleep = agg.runOnce(sleepIntervalMillis); - assertEquals(4, actualRuns); - assertEquals("checkpoint after too much lag is reset to " + - "current clock time", - clock.getTime(), checkPoint.get()); - assertEquals(sleep, sleepIntervalMillis); - } - - @Test - public void testDoWorkOnInterruptedRuns() throws Exception { - // start at some non-zero arbitrarily selected time; - int startingTime = 10000; - - // 1. - clock.setTime(startingTime); - long timeOfFirstStep = clock.getTime(); - long sleep = agg.runOnce(sleepIntervalMillis); - assertEquals("startTime should be zero", 0, startTimeInDoWork.get()); - assertEquals("endTime should be zero", 0, endTimeInDoWork.get()); - assertEquals("do not aggregate on first run", 0, actualRuns); - assertEquals("first checkpoint set on current time", timeOfFirstStep, - checkPoint.get()); - assertEquals(sleep, sleepIntervalMillis); - - // 2. - // the doWork was fast, and sleep was interrupted (e.g. restart) - // Q: do we want to aggregate just part of the system? maybe we should - // sleep up to next cycle start!! - clock.setTime(timeOfFirstStep + 1); - long timeOfSecondStep = clock.getTime(); - sleep = agg.runOnce(sleepIntervalMillis); - assertEquals("startTime should be on previous checkpoint since it did not" + - " run yet", - timeOfFirstStep, startTimeInDoWork.get()); - - assertEquals("endTime can be start + interval", - startingTime + sleepIntervalMillis, - endTimeInDoWork.get()); - assertEquals("should aggregate", 1, actualRuns); - assertEquals("checkpoint here should be set to min(endTime,currentTime), " + - "it is currentTime in our scenario", - timeOfSecondStep, checkPoint.get()); - - assertEquals(sleep, sleepIntervalMillis); - - //3. - // and again not a full sleep passed, so only small part was aggregated - clock.setTime(startingTime + 2); - long timeOfThirdStep = clock.getTime(); - - sleep = agg.runOnce(sleepIntervalMillis); - // startTime and endTime are both be in the future, makes no sens, - // query will not work!! - assertEquals("startTime should be previous checkpoint", - timeOfSecondStep, startTimeInDoWork.get()); - - assertEquals("endTime can be start + interval", - timeOfSecondStep + sleepIntervalMillis, - endTimeInDoWork.get()); - assertEquals("should aggregate", 2, actualRuns); - assertEquals("checkpoint here should be set to min(endTime,currentTime), " + - "it is currentTime in our scenario", - timeOfThirdStep, - checkPoint.get()); - assertEquals(sleep, sleepIntervalMillis); - - } - - private static class TestClock implements Clock { - - private long time; - - public void setTime(long time) { - this.time = time; - } - - @Override - public long getTime() { - return time; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/2c7aec60/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java deleted file mode 100644 index 4ddecdc..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java +++ /dev/null @@ -1,702 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; - - -import junit.framework.Assert; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricReadHelper; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.TreeMap; - -import static junit.framework.Assert.assertEquals; -import static junit.framework.Assert.assertNotNull; -import static junit.framework.Assert.assertTrue; -import static junit.framework.Assert.fail; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineClusterMetric; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.prepareSingleTimelineMetric; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_APP_IDS; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA; - -public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { - private Connection conn; - private PhoenixHBaseAccessor hdb; - private final TimelineMetricReadHelper metricReader = new TimelineMetricReadHelper(false); - - @Before - public void setUp() throws Exception { - Logger.getLogger("org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline").setLevel(Level.DEBUG); - hdb = createTestableHBaseAccessor(); - // inits connection, starts mini cluster - conn = getConnection(getUrl()); - - hdb.initMetricSchema(); - } - - @After - public void tearDown() throws Exception { - Connection conn = getConnection(getUrl()); - Statement stmt = conn.createStatement(); - - stmt.execute("delete from METRIC_AGGREGATE"); - stmt.execute("delete from METRIC_AGGREGATE_HOURLY"); - stmt.execute("delete from METRIC_RECORD"); - stmt.execute("delete from METRIC_RECORD_HOURLY"); - stmt.execute("delete from METRIC_RECORD_MINUTE"); - conn.commit(); - - stmt.close(); - conn.close(); - } - - private Configuration getConfigurationForTest(boolean useGroupByAggregators) { - Configuration configuration = new Configuration(); - configuration.set("timeline.metrics.service.use.groupBy.aggregators", String.valueOf(useGroupByAggregators)); - return configuration; - } - - @Test - public void testShouldAggregateClusterProperly() throws Exception { - // GIVEN - TimelineMetricAggregator agg = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, getConfigurationForTest(false)); - TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); - - long startTime = System.currentTimeMillis(); - long ctime = startTime; - long minute = 60 * 1000; - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", - "disk_free", 1)); - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2", - "disk_free", 2)); - ctime += 2*minute; - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", - "disk_free", 2)); - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2", - "disk_free", 1)); - - // WHEN - long endTime = ctime + minute; - boolean success = agg.doWork(startTime, endTime); - - //THEN - Condition condition = new DefaultCondition(null, null, null, null, startTime, - endTime, null, null, true); - condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL, - PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA), - METRICS_CLUSTER_AGGREGATE_TABLE_NAME)); - - PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); - ResultSet rs = pstmt.executeQuery(); - - int recordCount = 0; - while (rs.next()) { - TimelineClusterMetric currentMetric = metricReader.fromResultSet(rs); - MetricClusterAggregate currentHostAggregate = - readHelper.getMetricClusterAggregateFromResultSet(rs); - - if ("disk_free".equals(currentMetric.getMetricName())) { - assertEquals(2, currentHostAggregate.getNumberOfHosts()); - assertEquals(2.0, currentHostAggregate.getMax()); - assertEquals(1.0, currentHostAggregate.getMin()); - assertEquals(3.0, currentHostAggregate.getSum()); - recordCount++; - } else { - fail("Unexpected entry"); - } - } - } - - @Test - public void testShouldAggregateClusterIgnoringInstance() throws Exception { - // GIVEN - TimelineMetricAggregator agg = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, getConfigurationForTest(false)); - TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); - - long startTime = System.currentTimeMillis(); - long ctime = startTime; - long minute = 60 * 1000 * 2; - - /** - * Here we have two nodes with two instances each: - * | local1 | local2 | - * instance i1 | 1 | 2 | - * instance i2 | 3 | 4 | - * - */ - // Four 1's at ctime - 100 - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime - 100, "local1", - "i1", "disk_free", 1)); - // Four 2's at ctime - 100: different host - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime - 100, "local2", - "i1", "disk_free", 2)); - // Avoid overwrite - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime + 100, "local1", - "i2", "disk_free", 3)); - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime + 100, "local2", - "i2", "disk_free", 4)); - - ctime += minute; - - // Four 1's at ctime + 2 min - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime - 100, "local1", - "i1", "disk_free", 1)); - // Four 1's at ctime + 2 min - different host - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime - 100, "local2", - "i1", "disk_free", 3)); - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime + 100, "local1", - "i2", "disk_free", 2)); - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime + 100, "local2", - "i2", "disk_free", 4)); - // WHEN - long endTime = ctime + minute; - boolean success = agg.doWork(startTime - 1000, endTime + 1000); - - //THEN - Condition condition = new DefaultCondition(null, null, null, null, startTime, - endTime, null, null, true); - condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL, - PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA), - METRICS_CLUSTER_AGGREGATE_TABLE_NAME)); - - PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); - ResultSet rs = pstmt.executeQuery(); - - int recordCount = 0; - while (rs.next()) { - TimelineClusterMetric currentMetric = metricReader.fromResultSet(rs); - MetricClusterAggregate currentHostAggregate = - readHelper.getMetricClusterAggregateFromResultSet(rs); - - if ("disk_free".equals(currentMetric.getMetricName())) { - System.out.println("OUTPUT: " + currentMetric + " - " + currentHostAggregate); - assertEquals(2, currentHostAggregate.getNumberOfHosts()); - assertEquals(5.0, Math.floor(currentHostAggregate.getSum())); - recordCount++; - } else { - fail("Unexpected entry"); - } - } - - Assert.assertEquals(5, recordCount); - } - - @Test - public void testShouldAggregateDifferentMetricsOnClusterProperly() throws Exception { - // GIVEN - TimelineMetricAggregator agg = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, getConfigurationForTest(false)); - TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); - - // here we put some metrics tha will be aggregated - long startTime = System.currentTimeMillis(); - long ctime = startTime; - long minute = 60 * 1000; - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", - "disk_free", 1)); - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2", - "disk_free", 2)); - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", - "disk_used", 1)); - - ctime += 2*minute; - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", - "disk_free", 2)); - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2", - "disk_free", 1)); - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", - "disk_used", 1)); - - // WHEN - long endTime = ctime + minute; - boolean success = agg.doWork(startTime, endTime); - - //THEN - Condition condition = new DefaultCondition(null, null, null, null, startTime, - endTime, null, null, true); - condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL, - PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA), - METRICS_CLUSTER_AGGREGATE_TABLE_NAME)); - - PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); - ResultSet rs = pstmt.executeQuery(); - - int recordCount = 0; - while (rs.next()) { - TimelineClusterMetric currentMetric = metricReader.fromResultSet(rs); - MetricClusterAggregate currentHostAggregate = - readHelper.getMetricClusterAggregateFromResultSet(rs); - - if ("disk_free".equals(currentMetric.getMetricName())) { - assertEquals(2, currentHostAggregate.getNumberOfHosts()); - assertEquals(2.0, currentHostAggregate.getMax()); - assertEquals(1.0, currentHostAggregate.getMin()); - assertEquals(3.0, currentHostAggregate.getSum()); - recordCount++; - } else if ("disk_used".equals(currentMetric.getMetricName())) { - assertEquals(1, currentHostAggregate.getNumberOfHosts()); - assertEquals(1.0, currentHostAggregate.getMax()); - assertEquals(1.0, currentHostAggregate.getMin()); - assertEquals(1.0, currentHostAggregate.getSum()); - recordCount++; - } else { - fail("Unexpected entry"); - } - } - } - - @Test - public void testAggregateDailyClusterMetrics() throws Exception { - // GIVEN - TimelineMetricAggregator agg = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(hdb, getConfigurationForTest(false)); - - // this time can be virtualized! or made independent from real clock - long startTime = System.currentTimeMillis(); - long ctime = startTime; - long hour = 3600 * 1000; - - Map<TimelineClusterMetric, MetricHostAggregate> records = - new HashMap<TimelineClusterMetric, MetricHostAggregate>(); - - records.put(createEmptyTimelineClusterMetric(ctime), - MetricTestHelper.createMetricHostAggregate(4.0, 0.0, 2, 4.0)); - records.put(createEmptyTimelineClusterMetric(ctime += hour), - MetricTestHelper.createMetricHostAggregate(4.0, 0.0, 2, 4.0)); - records.put(createEmptyTimelineClusterMetric(ctime += hour), - MetricTestHelper.createMetricHostAggregate(4.0, 0.0, 2, 4.0)); - records.put(createEmptyTimelineClusterMetric(ctime += hour), - MetricTestHelper.createMetricHostAggregate(4.0, 0.0, 2, 4.0)); - - - hdb.saveClusterTimeAggregateRecords(records, METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME); - - // WHEN - agg.doWork(startTime, ctime + hour + 1000); - - // THEN - ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_DAILY"); - int count = 0; - while (rs.next()) { - assertEquals("METRIC_NAME", "disk_used", rs.getString("METRIC_NAME")); - assertEquals("APP_ID", "test_app", rs.getString("APP_ID")); - assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM")); - assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT")); - assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX")); - assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN")); - count++; - } - - assertEquals("Day aggregated row expected ", 1, count); - } - - @Test - public void testShouldAggregateClusterOnMinuteProperly() throws Exception { - - TimelineMetricAggregator agg = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, getConfigurationForTest(false)); - - long startTime = System.currentTimeMillis(); - long ctime = startTime; - long second = 1000; - long minute = 60*second; - - Map<TimelineClusterMetric, MetricClusterAggregate> records = - new HashMap<TimelineClusterMetric, MetricClusterAggregate>(); - - records.put(createEmptyTimelineClusterMetric(ctime), - new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); - records.put(createEmptyTimelineClusterMetric(ctime += second), - new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); - records.put(createEmptyTimelineClusterMetric(ctime += second), - new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); - records.put(createEmptyTimelineClusterMetric(ctime += second), - new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); - - hdb.saveClusterAggregateRecords(records); - agg.doWork(startTime, ctime + second); - long oldCtime = ctime + second; - - //Next minute - ctime = startTime + minute; - - records.put(createEmptyTimelineClusterMetric(ctime), - new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); - records.put(createEmptyTimelineClusterMetric(ctime += second), - new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); - records.put(createEmptyTimelineClusterMetric(ctime += second), - new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); - records.put(createEmptyTimelineClusterMetric(ctime += second), - new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); - - hdb.saveClusterAggregateRecords(records); - agg.doWork(oldCtime, ctime + second); - - ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_MINUTE"); - int count = 0; - long diff = 0 ; - while (rs.next()) { - assertEquals("METRIC_NAME", "disk_used", rs.getString("METRIC_NAME")); - assertEquals("APP_ID", "test_app", rs.getString("APP_ID")); - assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM")); - assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT")); - assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX")); - assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN")); - if (count == 0) { - diff+=rs.getLong("SERVER_TIME"); - } else { - diff-=rs.getLong("SERVER_TIME"); - if (diff < 0) { - diff*=-1; - } - assertTrue(diff == minute); - } - count++; - } - - assertEquals("One hourly aggregated row expected ", 2, count); - } - - @Test - public void testShouldAggregateClusterOnHourProperly() throws Exception { - // GIVEN - TimelineMetricAggregator agg = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, getConfigurationForTest(false)); - - // this time can be virtualized! or made independent from real clock - long startTime = System.currentTimeMillis(); - long ctime = startTime; - long minute = 60 * 1000; - - Map<TimelineClusterMetric, MetricClusterAggregate> records = - new HashMap<TimelineClusterMetric, MetricClusterAggregate>(); - - records.put(createEmptyTimelineClusterMetric(ctime), - new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); - records.put(createEmptyTimelineClusterMetric(ctime += minute), - new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); - records.put(createEmptyTimelineClusterMetric(ctime += minute), - new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); - records.put(createEmptyTimelineClusterMetric(ctime += minute), - new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); - - hdb.saveClusterAggregateRecords(records); - - // WHEN - agg.doWork(startTime, ctime + minute); - - // THEN - ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_HOURLY"); - int count = 0; - while (rs.next()) { - assertEquals("METRIC_NAME", "disk_used", rs.getString("METRIC_NAME")); - assertEquals("APP_ID", "test_app", rs.getString("APP_ID")); - assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM")); - assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT")); - assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX")); - assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN")); - count++; - } - - assertEquals("One hourly aggregated row expected ", 1, count); - } - - @Test - public void testShouldAggregateDifferentMetricsOnHourProperly() throws Exception { - // GIVEN - TimelineMetricAggregator agg = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, getConfigurationForTest(false)); - - long startTime = System.currentTimeMillis(); - long ctime = startTime; - long minute = 60 * 1000; - - Map<TimelineClusterMetric, MetricClusterAggregate> records = - new HashMap<TimelineClusterMetric, MetricClusterAggregate>(); - - records.put(createEmptyTimelineClusterMetric("disk_used", ctime), - new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); - records.put(createEmptyTimelineClusterMetric("disk_free", ctime), - new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0)); - - records.put(createEmptyTimelineClusterMetric("disk_used", ctime += minute), - new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); - records.put(createEmptyTimelineClusterMetric("disk_free", ctime), - new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0)); - - records.put(createEmptyTimelineClusterMetric("disk_used", ctime += minute), - new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); - records.put(createEmptyTimelineClusterMetric("disk_free", ctime), - new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0)); - - records.put(createEmptyTimelineClusterMetric("disk_used", ctime += minute), - new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); - records.put(createEmptyTimelineClusterMetric("disk_free", ctime), - new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0)); - - hdb.saveClusterAggregateRecords(records); - - // WHEN - agg.doWork(startTime, ctime + minute); - - // THEN - ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_HOURLY"); - int count = 0; - while (rs.next()) { - if ("disk_used".equals(rs.getString("METRIC_NAME"))) { - assertEquals("APP_ID", "test_app", rs.getString("APP_ID")); - assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM")); - assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT")); - assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX")); - assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN")); - } else if ("disk_free".equals(rs.getString("METRIC_NAME"))) { - assertEquals("APP_ID", "test_app", rs.getString("APP_ID")); - assertEquals("METRIC_SUM", 4.0, rs.getDouble("METRIC_SUM")); - assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT")); - assertEquals("METRIC_MAX", 1.0, rs.getDouble("METRIC_MAX")); - assertEquals("METRIC_MIN", 1.0, rs.getDouble("METRIC_MIN")); - } - - count++; - } - - assertEquals("Two hourly aggregated row expected ", 2, count); - } - - @Test - public void testAppLevelHostMetricAggregates() throws Exception { - Configuration conf = getConfigurationForTest(false); - conf.set(CLUSTER_AGGREGATOR_APP_IDS, "app1"); - TimelineMetricAggregator agg = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, conf); - TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); - - long startTime = System.currentTimeMillis(); - long ctime = startTime; - long minute = 60 * 1000; - hdb.insertMetricRecords(prepareSingleTimelineMetric((ctime), "local1", - "app1", null, "app_metric_random", 1)); - ctime += 10; - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", - "cpu_user", 1)); - ctime += 10; - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2", - "cpu_user", 2)); - - // WHEN - long endTime = ctime + minute; - boolean success = agg.doWork(startTime, endTime); - - //THEN - Condition condition = new DefaultCondition( - Collections.singletonList("cpu_user"), null, "app1", null, - startTime, endTime, null, null, true); - condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL, - PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA), - METRICS_CLUSTER_AGGREGATE_TABLE_NAME)); - - PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt - (conn, condition); - ResultSet rs = pstmt.executeQuery(); - - int recordCount = 0; - TimelineClusterMetric currentMetric = null; - MetricClusterAggregate currentHostAggregate = null; - while (rs.next()) { - currentMetric = metricReader.fromResultSet(rs); - currentHostAggregate = readHelper.getMetricClusterAggregateFromResultSet(rs); - recordCount++; - } - assertEquals(3, recordCount); - assertNotNull(currentMetric); - assertEquals("cpu_user", currentMetric.getMetricName()); - assertEquals("app1", currentMetric.getAppId()); - assertNotNull(currentHostAggregate); - assertEquals(1, currentHostAggregate.getNumberOfHosts()); - assertEquals(1.0d, currentHostAggregate.getSum()); - } - - @Test - public void testClusterAggregateMetricNormalization() throws Exception { - TimelineMetricAggregator agg = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, getConfigurationForTest(false)); - TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); - - // Sample data - TimelineMetric metric1 = new TimelineMetric(); - metric1.setMetricName("yarn.ClusterMetrics.NumActiveNMs"); - metric1.setAppId("resourcemanager"); - metric1.setHostName("h1"); - metric1.setStartTime(1431372311811l); - metric1.setMetricValues(new TreeMap<Long, Double>() {{ - put(1431372311811l, 1.0); - put(1431372321811l, 1.0); - put(1431372331811l, 1.0); - put(1431372341811l, 1.0); - put(1431372351811l, 1.0); - put(1431372361811l, 1.0); - put(1431372371810l, 1.0); - }}); - - TimelineMetric metric2 = new TimelineMetric(); - metric2.setMetricName("yarn.ClusterMetrics.NumActiveNMs"); - metric2.setAppId("resourcemanager"); - metric2.setHostName("h1"); - metric2.setStartTime(1431372381810l); - metric2.setMetricValues(new TreeMap<Long, Double>() {{ - put(1431372381810l, 1.0); - put(1431372391811l, 1.0); - put(1431372401811l, 1.0); - put(1431372411811l, 1.0); - put(1431372421811l, 1.0); - put(1431372431811l, 1.0); - put(1431372441810l, 1.0); - }}); - - TimelineMetrics metrics = new TimelineMetrics(); - metrics.setMetrics(Collections.singletonList(metric1)); - insertMetricRecords(conn, metrics, 1431372371810l); - - metrics.setMetrics(Collections.singletonList(metric2)); - insertMetricRecords(conn, metrics, 1431372441810l); - - long startTime = 1431372055000l; - long endTime = 1431372655000l; - - agg.doWork(startTime, endTime); - - Condition condition = new DefaultCondition(null, null, null, null, startTime, - endTime, null, null, true); - condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL, - PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA), - METRICS_CLUSTER_AGGREGATE_TABLE_NAME)); - - PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); - ResultSet rs = pstmt.executeQuery(); - - int recordCount = 0; - while (rs.next()) { - TimelineClusterMetric currentMetric = metricReader.fromResultSet(rs); - MetricClusterAggregate currentHostAggregate = readHelper.getMetricClusterAggregateFromResultSet(rs); - - if ("yarn.ClusterMetrics.NumActiveNMs".equals(currentMetric.getMetricName())) { - assertEquals(1, currentHostAggregate.getNumberOfHosts()); - assertEquals(1.0, currentHostAggregate.getMax()); - assertEquals(1.0, currentHostAggregate.getMin()); - assertEquals(1.0, currentHostAggregate.getSum()); - recordCount++; - } else { - fail("Unexpected entry"); - } - } - Assert.assertEquals(5, recordCount); - } - - @Test - public void testAggregationUsingGroupByQuery() throws Exception { - // GIVEN - TimelineMetricAggregator agg = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, getConfigurationForTest(true)); - - long startTime = System.currentTimeMillis(); - long ctime = startTime; - long minute = 60 * 1000; - - Map<TimelineClusterMetric, MetricClusterAggregate> records = - new HashMap<TimelineClusterMetric, MetricClusterAggregate>(); - - records.put(createEmptyTimelineClusterMetric("disk_used", ctime += minute), - new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); - records.put(createEmptyTimelineClusterMetric("disk_free", ctime), - new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0)); - - records.put(createEmptyTimelineClusterMetric("disk_used", ctime += minute), - new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); - records.put(createEmptyTimelineClusterMetric("disk_free", ctime), - new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0)); - - records.put(createEmptyTimelineClusterMetric("disk_used", ctime += minute), - new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); - records.put(createEmptyTimelineClusterMetric("disk_free", ctime), - new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0)); - - records.put(createEmptyTimelineClusterMetric("disk_used", ctime += minute), - new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); - records.put(createEmptyTimelineClusterMetric("disk_free", ctime), - new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0)); - - hdb.saveClusterAggregateRecords(records); - - // WHEN - agg.doWork(startTime, ctime + minute); - - // THEN - ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_HOURLY"); - int count = 0; - while (rs.next()) { - if ("disk_used".equals(rs.getString("METRIC_NAME"))) { - assertEquals("APP_ID", "test_app", rs.getString("APP_ID")); - assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM")); - assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT")); - assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX")); - assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN")); - } else if ("disk_free".equals(rs.getString("METRIC_NAME"))) { - assertEquals("APP_ID", "test_app", rs.getString("APP_ID")); - assertEquals("METRIC_SUM", 4.0, rs.getDouble("METRIC_SUM")); - assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT")); - assertEquals("METRIC_MAX", 1.0, rs.getDouble("METRIC_MAX")); - assertEquals("METRIC_MIN", 1.0, rs.getDouble("METRIC_MIN")); - } - count++; - } - assertEquals("Two hourly aggregated row expected ", 2, count); - } - - private ResultSet executeQuery(String query) throws SQLException { - Connection conn = getConnection(getUrl()); - Statement stmt = conn.createStatement(); - return stmt.executeQuery(query); - } -}