Repository: ambari Updated Branches: refs/heads/branch-2.2 c6af0197b -> 2c7aec608
http://git-wip-us.apache.org/repos/asf/ambari/blob/2c7aec60/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java deleted file mode 100644 index e9c25cf..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java +++ /dev/null @@ -1,398 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricReadHelper; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.Statement; -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.Map; -import java.util.TreeMap; - -import static junit.framework.Assert.assertEquals; -import static junit.framework.Assert.assertTrue; -import static junit.framework.Assert.fail; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineMetric; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA; -import static org.assertj.core.api.Assertions.assertThat; - -public class ITMetricAggregator extends AbstractMiniHBaseClusterTest { - private Connection conn; - private PhoenixHBaseAccessor hdb; - - @Before - public void setUp() throws Exception { - Logger.getLogger("org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline").setLevel(Level.DEBUG); - hdb = createTestableHBaseAccessor(); - // inits connection, starts mini cluster - conn = getConnection(getUrl()); - - hdb.initMetricSchema(); - } - - @After - public void tearDown() throws Exception { - Connection conn = getConnection(getUrl()); - Statement stmt = conn.createStatement(); - - stmt.execute("delete from METRIC_AGGREGATE"); - stmt.execute("delete from METRIC_AGGREGATE_HOURLY"); - stmt.execute("delete from METRIC_RECORD"); - stmt.execute("delete from METRIC_RECORD_HOURLY"); - stmt.execute("delete from METRIC_RECORD_MINUTE"); - conn.commit(); - - stmt.close(); - conn.close(); - } - - @Test - public void testShouldInsertMetrics() throws Exception { - // GIVEN - - // WHEN - long startTime = System.currentTimeMillis(); - TimelineMetrics metricsSent = prepareTimelineMetrics(startTime, "local"); - hdb.insertMetricRecords(metricsSent); - - Condition queryCondition = new DefaultCondition(null, - Collections.singletonList("local"), null, null, startTime, - startTime + (15 * 60 * 1000), null, null, false); - TimelineMetrics recordRead = hdb.getMetricRecords(queryCondition, null); - - // THEN - assertThat(recordRead.getMetrics()).hasSize(2) - .extracting("metricName") - .containsOnly("mem_free", "disk_free"); - - assertThat(metricsSent.getMetrics()) - .usingElementComparator(TIME_IGNORING_COMPARATOR) - .containsExactlyElementsOf(recordRead.getMetrics()); - } - - private Configuration getConfigurationForTest(boolean useGroupByAggregators) { - Configuration configuration = new Configuration(); - configuration.set("timeline.metrics.service.use.groupBy.aggregators", String.valueOf(useGroupByAggregators)); - return configuration; - } - - @Test - public void testShouldAggregateMinuteProperly() throws Exception { - // GIVEN - TimelineMetricAggregator aggregatorMinute = - TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hdb, - getConfigurationForTest(false)); - TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); - - long startTime = System.currentTimeMillis(); - long ctime = startTime; - long minute = 60 * 1000; - hdb.insertMetricRecords(prepareTimelineMetrics(startTime, "local")); - hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local")); - hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local")); - hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local")); - hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local")); - - // WHEN - long endTime = startTime + 1000 * 60 * 4; - boolean success = aggregatorMinute.doWork(startTime, endTime); - - //THEN - Condition condition = new DefaultCondition(null, null, null, null, startTime, - endTime, null, null, true); - condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL, - PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA), - METRICS_AGGREGATE_MINUTE_TABLE_NAME)); - - PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); - ResultSet rs = pstmt.executeQuery(); - MetricHostAggregate expectedAggregate = - MetricTestHelper.createMetricHostAggregate(2.0, 0.0, 20, 15.0); - - int count = 0; - while (rs.next()) { - TimelineMetric currentMetric = - readHelper.getTimelineMetricKeyFromResultSet(rs); - MetricHostAggregate currentHostAggregate = - readHelper.getMetricHostAggregateFromResultSet(rs); - - if ("disk_free".equals(currentMetric.getMetricName())) { - assertEquals(2.0, currentHostAggregate.getMax()); - assertEquals(0.0, currentHostAggregate.getMin()); - assertEquals(20, currentHostAggregate.getNumberOfSamples()); - assertEquals(15.0, currentHostAggregate.getSum()); - assertEquals(15.0 / 20, currentHostAggregate.getAvg()); - count++; - } else if ("mem_free".equals(currentMetric.getMetricName())) { - assertEquals(2.0, currentHostAggregate.getMax()); - assertEquals(0.0, currentHostAggregate.getMin()); - assertEquals(20, currentHostAggregate.getNumberOfSamples()); - assertEquals(15.0, currentHostAggregate.getSum()); - assertEquals(15.0 / 20, currentHostAggregate.getAvg()); - count++; - } else { - fail("Unexpected entry"); - } - } - assertEquals("Two aggregated entries expected", 2, count); - } - - @Test - public void testShouldAggregateHourProperly() throws Exception { - // GIVEN - TimelineMetricAggregator aggregator = - TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hdb, - getConfigurationForTest(false)); - TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); - long startTime = System.currentTimeMillis(); - - MetricHostAggregate expectedAggregate = - MetricTestHelper.createMetricHostAggregate(2.0, 0.0, 20, 15.0); - Map<TimelineMetric, MetricHostAggregate> - aggMap = new HashMap<TimelineMetric, - MetricHostAggregate>(); - - int min_5 = 5 * 60 * 1000; - long ctime = startTime - min_5; - aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); - aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); - aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); - aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); - aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); - aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); - aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); - aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); - aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); - aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); - aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); - aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); - - hdb.saveHostAggregateRecords(aggMap, METRICS_AGGREGATE_MINUTE_TABLE_NAME); - - //WHEN - long endTime = ctime + min_5; - boolean success = aggregator.doWork(startTime, endTime); - assertTrue(success); - - //THEN - Condition condition = new DefaultCondition(null, null, null, null, startTime, - endTime, null, null, true); - condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL, - PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA), - METRICS_AGGREGATE_HOURLY_TABLE_NAME)); - - PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); - ResultSet rs = pstmt.executeQuery(); - - while (rs.next()) { - TimelineMetric currentMetric = - readHelper.getTimelineMetricKeyFromResultSet(rs); - MetricHostAggregate currentHostAggregate = - readHelper.getMetricHostAggregateFromResultSet(rs); - - if ("disk_used".equals(currentMetric.getMetricName())) { - assertEquals(2.0, currentHostAggregate.getMax()); - assertEquals(0.0, currentHostAggregate.getMin()); - assertEquals(12 * 20, currentHostAggregate.getNumberOfSamples()); - assertEquals(12 * 15.0, currentHostAggregate.getSum()); - assertEquals(15.0 / 20, currentHostAggregate.getAvg()); - } - } - } - - @Test - public void testMetricAggregateDaily() throws Exception { - // GIVEN - TimelineMetricAggregator aggregator = - TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(hdb, - getConfigurationForTest(false)); - TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); - long startTime = System.currentTimeMillis(); - - MetricHostAggregate expectedAggregate = - MetricTestHelper.createMetricHostAggregate(2.0, 0.0, 20, 15.0); - Map<TimelineMetric, MetricHostAggregate> - aggMap = new HashMap<TimelineMetric, MetricHostAggregate>(); - - int min_5 = 5 * 60 * 1000; - long ctime = startTime - min_5; - aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); - aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); - aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); - aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); - aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); - aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); - aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); - aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); - aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); - aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); - aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); - aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); - - hdb.saveHostAggregateRecords(aggMap, METRICS_AGGREGATE_HOURLY_TABLE_NAME); - - //WHEN - long endTime = ctime + min_5; - boolean success = aggregator.doWork(startTime, endTime); - assertTrue(success); - - //THEN - Condition condition = new DefaultCondition(null, null, null, null, startTime, - endTime, null, null, true); - condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL, - PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA), - METRICS_AGGREGATE_DAILY_TABLE_NAME)); - - PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); - ResultSet rs = pstmt.executeQuery(); - - while (rs.next()) { - TimelineMetric currentMetric = - readHelper.getTimelineMetricKeyFromResultSet(rs); - MetricHostAggregate currentHostAggregate = - readHelper.getMetricHostAggregateFromResultSet(rs); - - if ("disk_used".equals(currentMetric.getMetricName())) { - assertEquals(2.0, currentHostAggregate.getMax()); - assertEquals(0.0, currentHostAggregate.getMin()); - assertEquals(12 * 20, currentHostAggregate.getNumberOfSamples()); - assertEquals(12 * 15.0, currentHostAggregate.getSum()); - assertEquals(15.0 / 20, currentHostAggregate.getAvg()); - } - } - } - - @Test - public void testAggregationUsingGroupByQuery() throws Exception { - // GIVEN - TimelineMetricAggregator aggregatorMinute = - TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hdb, - getConfigurationForTest(true)); - TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); - - long startTime = System.currentTimeMillis(); - long ctime = startTime; - long minute = 60 * 1000; - hdb.insertMetricRecords(prepareTimelineMetrics(startTime, "local")); - hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local")); - hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local")); - hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local")); - hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local")); - - long endTime = startTime + 1000 * 60 * 4; - boolean success = aggregatorMinute.doWork(startTime, endTime); - assertTrue(success); - - Condition condition = new DefaultCondition(null, null, null, null, startTime, - endTime, null, null, true); - condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL, - PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA), - METRICS_AGGREGATE_MINUTE_TABLE_NAME)); - - PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); - ResultSet rs = pstmt.executeQuery(); - MetricHostAggregate expectedAggregate = - MetricTestHelper.createMetricHostAggregate(2.0, 0.0, 20, 15.0); - - int count = 0; - while (rs.next()) { - TimelineMetric currentMetric = - readHelper.getTimelineMetricKeyFromResultSet(rs); - MetricHostAggregate currentHostAggregate = - readHelper.getMetricHostAggregateFromResultSet(rs); - - if ("disk_free".equals(currentMetric.getMetricName())) { - assertEquals(2.0, currentHostAggregate.getMax()); - assertEquals(0.0, currentHostAggregate.getMin()); - assertEquals(20, currentHostAggregate.getNumberOfSamples()); - assertEquals(15.0, currentHostAggregate.getSum()); - assertEquals(15.0 / 20, currentHostAggregate.getAvg()); - count++; - } else if ("mem_free".equals(currentMetric.getMetricName())) { - assertEquals(2.0, currentHostAggregate.getMax()); - assertEquals(0.0, currentHostAggregate.getMin()); - assertEquals(20, currentHostAggregate.getNumberOfSamples()); - assertEquals(15.0, currentHostAggregate.getSum()); - assertEquals(15.0 / 20, currentHostAggregate.getAvg()); - count++; - } else { - fail("Unexpected entry"); - } - } - assertEquals("Two aggregated entries expected", 2, count); - } - - private final static Comparator<TimelineMetric> TIME_IGNORING_COMPARATOR = - new Comparator<TimelineMetric>() { - @Override - public int compare(TimelineMetric o1, TimelineMetric o2) { - return o1.equalsExceptTime(o2) ? 0 : 1; - } - }; - - private TimelineMetrics prepareTimelineMetrics(long startTime, String host) { - TimelineMetrics metrics = new TimelineMetrics(); - metrics.setMetrics(Arrays.asList( - createMetric(startTime, "disk_free", host), - createMetric(startTime, "mem_free", host))); - - return metrics; - } - - private TimelineMetric createMetric(long startTime, String metricName, String host) { - TimelineMetric m = new TimelineMetric(); - m.setAppId("host"); - m.setHostName(host); - m.setMetricName(metricName); - m.setStartTime(startTime); - TreeMap<Long, Double> vals = new TreeMap<Long, Double>(); - vals.put(startTime + 15000l, 0.0); - vals.put(startTime + 30000l, 0.0); - vals.put(startTime + 45000l, 1.0); - vals.put(startTime + 60000l, 2.0); - - m.setMetricValues(vals); - - return m; - } - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/2c7aec60/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestClusterSuite.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestClusterSuite.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestClusterSuite.java index 8405b49..1fc5b0e 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestClusterSuite.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestClusterSuite.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.ITClusterAggregator; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.ITMetricAggregator; import org.junit.Ignore; import org.junit.runner.RunWith; import org.junit.runners.Suite; http://git-wip-us.apache.org/repos/asf/ambari/blob/2c7aec60/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java new file mode 100644 index 0000000..2b29469 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java @@ -0,0 +1,276 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AbstractTimelineAggregator; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; +import org.apache.hadoop.yarn.util.Clock; +import org.junit.Before; +import org.junit.Test; +import java.io.IOException; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.concurrent.atomic.AtomicLong; +import static junit.framework.Assert.assertEquals; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATOR_CHECKPOINT_DELAY; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.RESULTSET_FETCH_SIZE; + +public class AbstractTimelineAggregatorTest { + + private AbstractTimelineAggregator agg; + TestClock clock = new TestClock(); + + AtomicLong startTimeInDoWork; + AtomicLong endTimeInDoWork; + AtomicLong checkPoint; + int actualRuns; + + long sleepIntervalMillis; + int checkpointCutOffMultiplier; + + @Before + public void setUp() throws Exception { + sleepIntervalMillis = 30000l; + checkpointCutOffMultiplier = 2; + + Configuration metricsConf = new Configuration(); + metricsConf.setInt(AGGREGATOR_CHECKPOINT_DELAY, 0); + metricsConf.setInt(RESULTSET_FETCH_SIZE, 2000); + + startTimeInDoWork = new AtomicLong(0); + endTimeInDoWork = new AtomicLong(0); + checkPoint = new AtomicLong(-1); + actualRuns = 0; + + agg = new AbstractTimelineAggregator("TimelineAggregatorTest", null, metricsConf, clock) { + @Override + public boolean doWork(long startTime, long endTime) { + startTimeInDoWork.set(startTime); + endTimeInDoWork.set(endTime); + actualRuns++; + + return true; + } + + @Override + protected Condition + prepareMetricQueryCondition(long startTime, long endTime) { + return null; + } + + @Override + protected void aggregate(ResultSet rs, long startTime, + long endTime) throws IOException, SQLException { + } + + @Override + protected Long getSleepIntervalMillis() { + return sleepIntervalMillis; + } + + @Override + protected Integer getCheckpointCutOffMultiplier() { + return checkpointCutOffMultiplier; + } + + @Override + public boolean isDisabled() { + return false; + } + + @Override + protected String getCheckpointLocation() { + return "dummy_ckptFile"; + } + + protected long readCheckPoint() { + return checkPoint.get(); + } + + @Override + protected void saveCheckPoint(long checkpointTime) throws IOException { + checkPoint.set(checkpointTime); + } + }; + + + } + + @Test + public void testDoWorkOnZeroDelay() throws Exception { + + // starting at time 0; + clock.setTime(0); + + long sleep = agg.runOnce(sleepIntervalMillis); + assertEquals("startTime should be zero", 0, startTimeInDoWork.get()); + assertEquals("endTime should be zero", 0, endTimeInDoWork.get()); + assertEquals(0, checkPoint.get()); + assertEquals(sleep, sleepIntervalMillis); + assertEquals("Do not aggregate on first run", 0, actualRuns); + + // exactly one sleepInterval + clock.setTime(clock.getTime() + sleepIntervalMillis); + sleep = agg.runOnce(sleepIntervalMillis); + assertEquals("startTime", clock.getTime() - + sleepIntervalMillis, + startTimeInDoWork.get()); + assertEquals("endTime", clock.getTime(), + endTimeInDoWork.get()); + assertEquals(clock.getTime(), checkPoint.get()); + assertEquals(sleep, sleepIntervalMillis); + assertEquals(1, actualRuns); + + // exactly one sleepInterval + clock.setTime(clock.getTime() + sleepIntervalMillis); + sleep = agg.runOnce(sleepIntervalMillis); + assertEquals("startTime", clock.getTime() - + sleepIntervalMillis, + startTimeInDoWork.get()); + assertEquals("endTime", clock.getTime(), + endTimeInDoWork.get()); + assertEquals(clock.getTime(), checkPoint.get()); + assertEquals(sleep, sleepIntervalMillis); + assertEquals(2, actualRuns); + + // checkpointCutOffMultiplier x sleepInterval - should pass, + // it will aggregate only first part of the whole 2x interval + // and sleep as usual (don't we need to skip some sleep?) + // + // effectively checkpoint will be one interval in the past, + // so next run will + clock.setTime(clock.getTime() + (checkpointCutOffMultiplier * + sleepIntervalMillis)); + sleep = agg.runOnce(sleepIntervalMillis); + assertEquals("startTime after 2xinterval", clock.getTime() - + (checkpointCutOffMultiplier * sleepIntervalMillis), + startTimeInDoWork.get()); + assertEquals("endTime after 2xinterval", clock.getTime() - + sleepIntervalMillis, + endTimeInDoWork.get()); + assertEquals("checkpoint after 2xinterval", clock.getTime() - + sleepIntervalMillis, checkPoint.get()); + assertEquals(sleep, sleepIntervalMillis); + assertEquals(3, actualRuns); + + // exactly one sleepInterval after one that lagged by one whole interval, + // so it will do the previous one... and sleep as usual + // no way to keep up + clock.setTime(clock.getTime() + sleepIntervalMillis); + sleep = agg.runOnce(sleepIntervalMillis); + assertEquals("startTime ", clock.getTime() - + (checkpointCutOffMultiplier * sleepIntervalMillis), + startTimeInDoWork.get()); + assertEquals("endTime ", clock.getTime() - + sleepIntervalMillis, + endTimeInDoWork.get()); + assertEquals("checkpoint ", clock.getTime() - sleepIntervalMillis, + checkPoint.get()); + assertEquals(sleep, sleepIntervalMillis); + assertEquals(4, actualRuns); + + + // checkpointCutOffMultiplier x sleepInterval - in normal state should pass, + // but the clock lags too much, so this will not execute aggregation + // just update checkpoint to currentTime + clock.setTime(clock.getTime() + (checkpointCutOffMultiplier * + sleepIntervalMillis)); + sleep = agg.runOnce(sleepIntervalMillis); + assertEquals(4, actualRuns); + assertEquals("checkpoint after too much lag is reset to " + + "current clock time", + clock.getTime(), checkPoint.get()); + assertEquals(sleep, sleepIntervalMillis); + } + + @Test + public void testDoWorkOnInterruptedRuns() throws Exception { + // start at some non-zero arbitrarily selected time; + int startingTime = 10000; + + // 1. + clock.setTime(startingTime); + long timeOfFirstStep = clock.getTime(); + long sleep = agg.runOnce(sleepIntervalMillis); + assertEquals("startTime should be zero", 0, startTimeInDoWork.get()); + assertEquals("endTime should be zero", 0, endTimeInDoWork.get()); + assertEquals("do not aggregate on first run", 0, actualRuns); + assertEquals("first checkpoint set on current time", timeOfFirstStep, + checkPoint.get()); + assertEquals(sleep, sleepIntervalMillis); + + // 2. + // the doWork was fast, and sleep was interrupted (e.g. restart) + // Q: do we want to aggregate just part of the system? maybe we should + // sleep up to next cycle start!! + clock.setTime(timeOfFirstStep + 1); + long timeOfSecondStep = clock.getTime(); + sleep = agg.runOnce(sleepIntervalMillis); + assertEquals("startTime should be on previous checkpoint since it did not" + + " run yet", + timeOfFirstStep, startTimeInDoWork.get()); + + assertEquals("endTime can be start + interval", + startingTime + sleepIntervalMillis, + endTimeInDoWork.get()); + assertEquals("should aggregate", 1, actualRuns); + assertEquals("checkpoint here should be set to min(endTime,currentTime), " + + "it is currentTime in our scenario", + timeOfSecondStep, checkPoint.get()); + + assertEquals(sleep, sleepIntervalMillis); + + //3. + // and again not a full sleep passed, so only small part was aggregated + clock.setTime(startingTime + 2); + long timeOfThirdStep = clock.getTime(); + + sleep = agg.runOnce(sleepIntervalMillis); + // startTime and endTime are both be in the future, makes no sens, + // query will not work!! + assertEquals("startTime should be previous checkpoint", + timeOfSecondStep, startTimeInDoWork.get()); + + assertEquals("endTime can be start + interval", + timeOfSecondStep + sleepIntervalMillis, + endTimeInDoWork.get()); + assertEquals("should aggregate", 2, actualRuns); + assertEquals("checkpoint here should be set to min(endTime,currentTime), " + + "it is currentTime in our scenario", + timeOfThirdStep, + checkPoint.get()); + assertEquals(sleep, sleepIntervalMillis); + + } + + private static class TestClock implements Clock { + + private long time; + + public void setTime(long time) { + this.time = time; + } + + @Override + public long getTime() { + return time; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/2c7aec60/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java new file mode 100644 index 0000000..6672dae --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java @@ -0,0 +1,677 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; + + +import junit.framework.Assert; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractMiniHBaseClusterTest; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricReadHelper; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.TreeMap; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertNotNull; +import static junit.framework.Assert.assertTrue; +import static junit.framework.Assert.fail; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineClusterMetric; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.prepareSingleTimelineMetric; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_APP_IDS; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA; + +public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { + private final TimelineMetricReadHelper metricReader = new TimelineMetricReadHelper(false); + + private Configuration getConfigurationForTest(boolean useGroupByAggregators) { + Configuration configuration = new Configuration(); + configuration.set("timeline.metrics.service.use.groupBy.aggregators", String.valueOf(useGroupByAggregators)); + return configuration; + } + + @Test + public void testShouldAggregateClusterProperly() throws Exception { + // GIVEN + TimelineMetricAggregator agg = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, getConfigurationForTest(false)); + TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); + + long startTime = System.currentTimeMillis(); + long ctime = startTime; + long minute = 60 * 1000; + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", + "disk_free", 1)); + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2", + "disk_free", 2)); + ctime += 2*minute; + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", + "disk_free", 2)); + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2", + "disk_free", 1)); + + // WHEN + long endTime = ctime + minute; + boolean success = agg.doWork(startTime, endTime); + + //THEN + Condition condition = new DefaultCondition(null, null, null, null, startTime, + endTime, null, null, true); + condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL, + PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA), + METRICS_CLUSTER_AGGREGATE_TABLE_NAME)); + + PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); + ResultSet rs = pstmt.executeQuery(); + + int recordCount = 0; + while (rs.next()) { + TimelineClusterMetric currentMetric = metricReader.fromResultSet(rs); + MetricClusterAggregate currentHostAggregate = + readHelper.getMetricClusterAggregateFromResultSet(rs); + + if ("disk_free".equals(currentMetric.getMetricName())) { + assertEquals(2, currentHostAggregate.getNumberOfHosts()); + assertEquals(2.0, currentHostAggregate.getMax()); + assertEquals(1.0, currentHostAggregate.getMin()); + assertEquals(3.0, currentHostAggregate.getSum()); + recordCount++; + } else { + fail("Unexpected entry"); + } + } + } + + @Test + public void testShouldAggregateClusterIgnoringInstance() throws Exception { + // GIVEN + TimelineMetricAggregator agg = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, getConfigurationForTest(false)); + TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); + + long startTime = System.currentTimeMillis(); + long ctime = startTime; + long minute = 60 * 1000 * 2; + + /** + * Here we have two nodes with two instances each: + * | local1 | local2 | + * instance i1 | 1 | 2 | + * instance i2 | 3 | 4 | + * + */ + // Four 1's at ctime - 100 + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime - 100, "local1", + "i1", "disk_free", 1)); + // Four 2's at ctime - 100: different host + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime - 100, "local2", + "i1", "disk_free", 2)); + // Avoid overwrite + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime + 100, "local1", + "i2", "disk_free", 3)); + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime + 100, "local2", + "i2", "disk_free", 4)); + + ctime += minute; + + // Four 1's at ctime + 2 min + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime - 100, "local1", + "i1", "disk_free", 1)); + // Four 1's at ctime + 2 min - different host + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime - 100, "local2", + "i1", "disk_free", 3)); + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime + 100, "local1", + "i2", "disk_free", 2)); + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime + 100, "local2", + "i2", "disk_free", 4)); + // WHEN + long endTime = ctime + minute; + boolean success = agg.doWork(startTime - 1000, endTime + 1000); + + //THEN + Condition condition = new DefaultCondition(null, null, null, null, startTime, + endTime, null, null, true); + condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL, + PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA), + METRICS_CLUSTER_AGGREGATE_TABLE_NAME)); + + PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); + ResultSet rs = pstmt.executeQuery(); + + int recordCount = 0; + while (rs.next()) { + TimelineClusterMetric currentMetric = metricReader.fromResultSet(rs); + MetricClusterAggregate currentHostAggregate = + readHelper.getMetricClusterAggregateFromResultSet(rs); + + if ("disk_free".equals(currentMetric.getMetricName())) { + System.out.println("OUTPUT: " + currentMetric + " - " + currentHostAggregate); + assertEquals(2, currentHostAggregate.getNumberOfHosts()); + assertEquals(5.0, Math.floor(currentHostAggregate.getSum())); + recordCount++; + } else { + fail("Unexpected entry"); + } + } + + Assert.assertEquals(5, recordCount); + } + + @Test + public void testShouldAggregateDifferentMetricsOnClusterProperly() throws Exception { + // GIVEN + TimelineMetricAggregator agg = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, getConfigurationForTest(false)); + TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); + + // here we put some metrics tha will be aggregated + long startTime = System.currentTimeMillis(); + long ctime = startTime; + long minute = 60 * 1000; + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", + "disk_free", 1)); + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2", + "disk_free", 2)); + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", + "disk_used", 1)); + + ctime += 2*minute; + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", + "disk_free", 2)); + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2", + "disk_free", 1)); + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", + "disk_used", 1)); + + // WHEN + long endTime = ctime + minute; + boolean success = agg.doWork(startTime, endTime); + + //THEN + Condition condition = new DefaultCondition(null, null, null, null, startTime, + endTime, null, null, true); + condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL, + PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA), + METRICS_CLUSTER_AGGREGATE_TABLE_NAME)); + + PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); + ResultSet rs = pstmt.executeQuery(); + + int recordCount = 0; + while (rs.next()) { + TimelineClusterMetric currentMetric = metricReader.fromResultSet(rs); + MetricClusterAggregate currentHostAggregate = + readHelper.getMetricClusterAggregateFromResultSet(rs); + + if ("disk_free".equals(currentMetric.getMetricName())) { + assertEquals(2, currentHostAggregate.getNumberOfHosts()); + assertEquals(2.0, currentHostAggregate.getMax()); + assertEquals(1.0, currentHostAggregate.getMin()); + assertEquals(3.0, currentHostAggregate.getSum()); + recordCount++; + } else if ("disk_used".equals(currentMetric.getMetricName())) { + assertEquals(1, currentHostAggregate.getNumberOfHosts()); + assertEquals(1.0, currentHostAggregate.getMax()); + assertEquals(1.0, currentHostAggregate.getMin()); + assertEquals(1.0, currentHostAggregate.getSum()); + recordCount++; + } else { + fail("Unexpected entry"); + } + } + } + + @Test + public void testAggregateDailyClusterMetrics() throws Exception { + // GIVEN + TimelineMetricAggregator agg = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(hdb, getConfigurationForTest(false)); + + // this time can be virtualized! or made independent from real clock + long startTime = System.currentTimeMillis(); + long ctime = startTime; + long hour = 3600 * 1000; + + Map<TimelineClusterMetric, MetricHostAggregate> records = + new HashMap<TimelineClusterMetric, MetricHostAggregate>(); + + records.put(createEmptyTimelineClusterMetric(ctime), + MetricTestHelper.createMetricHostAggregate(4.0, 0.0, 2, 4.0)); + records.put(createEmptyTimelineClusterMetric(ctime += hour), + MetricTestHelper.createMetricHostAggregate(4.0, 0.0, 2, 4.0)); + records.put(createEmptyTimelineClusterMetric(ctime += hour), + MetricTestHelper.createMetricHostAggregate(4.0, 0.0, 2, 4.0)); + records.put(createEmptyTimelineClusterMetric(ctime += hour), + MetricTestHelper.createMetricHostAggregate(4.0, 0.0, 2, 4.0)); + + + hdb.saveClusterTimeAggregateRecords(records, METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME); + + // WHEN + agg.doWork(startTime, ctime + hour + 1000); + + // THEN + ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_DAILY"); + int count = 0; + while (rs.next()) { + assertEquals("METRIC_NAME", "disk_used", rs.getString("METRIC_NAME")); + assertEquals("APP_ID", "test_app", rs.getString("APP_ID")); + assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM")); + assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT")); + assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX")); + assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN")); + count++; + } + + assertEquals("Day aggregated row expected ", 1, count); + } + + @Test + public void testShouldAggregateClusterOnMinuteProperly() throws Exception { + + TimelineMetricAggregator agg = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, getConfigurationForTest(false)); + + long startTime = System.currentTimeMillis(); + long ctime = startTime; + long second = 1000; + long minute = 60*second; + + Map<TimelineClusterMetric, MetricClusterAggregate> records = + new HashMap<TimelineClusterMetric, MetricClusterAggregate>(); + + records.put(createEmptyTimelineClusterMetric(ctime), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(createEmptyTimelineClusterMetric(ctime += second), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(createEmptyTimelineClusterMetric(ctime += second), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(createEmptyTimelineClusterMetric(ctime += second), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + + hdb.saveClusterAggregateRecords(records); + agg.doWork(startTime, ctime + second); + long oldCtime = ctime + second; + + //Next minute + ctime = startTime + minute; + + records.put(createEmptyTimelineClusterMetric(ctime), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(createEmptyTimelineClusterMetric(ctime += second), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(createEmptyTimelineClusterMetric(ctime += second), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(createEmptyTimelineClusterMetric(ctime += second), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + + hdb.saveClusterAggregateRecords(records); + agg.doWork(oldCtime, ctime + second); + + ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_MINUTE"); + int count = 0; + long diff = 0 ; + while (rs.next()) { + assertEquals("METRIC_NAME", "disk_used", rs.getString("METRIC_NAME")); + assertEquals("APP_ID", "test_app", rs.getString("APP_ID")); + assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM")); + assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT")); + assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX")); + assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN")); + if (count == 0) { + diff+=rs.getLong("SERVER_TIME"); + } else { + diff-=rs.getLong("SERVER_TIME"); + if (diff < 0) { + diff*=-1; + } + assertTrue(diff == minute); + } + count++; + } + + assertEquals("One hourly aggregated row expected ", 2, count); + } + + @Test + public void testShouldAggregateClusterOnHourProperly() throws Exception { + // GIVEN + TimelineMetricAggregator agg = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, getConfigurationForTest(false)); + + // this time can be virtualized! or made independent from real clock + long startTime = System.currentTimeMillis(); + long ctime = startTime; + long minute = 60 * 1000; + + Map<TimelineClusterMetric, MetricClusterAggregate> records = + new HashMap<TimelineClusterMetric, MetricClusterAggregate>(); + + records.put(createEmptyTimelineClusterMetric(ctime), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(createEmptyTimelineClusterMetric(ctime += minute), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(createEmptyTimelineClusterMetric(ctime += minute), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(createEmptyTimelineClusterMetric(ctime += minute), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + + hdb.saveClusterAggregateRecords(records); + + // WHEN + agg.doWork(startTime, ctime + minute); + + // THEN + ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_HOURLY"); + int count = 0; + while (rs.next()) { + assertEquals("METRIC_NAME", "disk_used", rs.getString("METRIC_NAME")); + assertEquals("APP_ID", "test_app", rs.getString("APP_ID")); + assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM")); + assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT")); + assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX")); + assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN")); + count++; + } + + assertEquals("One hourly aggregated row expected ", 1, count); + } + + @Test + public void testShouldAggregateDifferentMetricsOnHourProperly() throws Exception { + // GIVEN + TimelineMetricAggregator agg = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, getConfigurationForTest(false)); + + long startTime = System.currentTimeMillis(); + long ctime = startTime; + long minute = 60 * 1000; + + Map<TimelineClusterMetric, MetricClusterAggregate> records = + new HashMap<TimelineClusterMetric, MetricClusterAggregate>(); + + records.put(createEmptyTimelineClusterMetric("disk_used", ctime), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(createEmptyTimelineClusterMetric("disk_free", ctime), + new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0)); + + records.put(createEmptyTimelineClusterMetric("disk_used", ctime += minute), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(createEmptyTimelineClusterMetric("disk_free", ctime), + new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0)); + + records.put(createEmptyTimelineClusterMetric("disk_used", ctime += minute), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(createEmptyTimelineClusterMetric("disk_free", ctime), + new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0)); + + records.put(createEmptyTimelineClusterMetric("disk_used", ctime += minute), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(createEmptyTimelineClusterMetric("disk_free", ctime), + new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0)); + + hdb.saveClusterAggregateRecords(records); + + // WHEN + agg.doWork(startTime, ctime + minute); + + // THEN + ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_HOURLY"); + int count = 0; + while (rs.next()) { + if ("disk_used".equals(rs.getString("METRIC_NAME"))) { + assertEquals("APP_ID", "test_app", rs.getString("APP_ID")); + assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM")); + assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT")); + assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX")); + assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN")); + } else if ("disk_free".equals(rs.getString("METRIC_NAME"))) { + assertEquals("APP_ID", "test_app", rs.getString("APP_ID")); + assertEquals("METRIC_SUM", 4.0, rs.getDouble("METRIC_SUM")); + assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT")); + assertEquals("METRIC_MAX", 1.0, rs.getDouble("METRIC_MAX")); + assertEquals("METRIC_MIN", 1.0, rs.getDouble("METRIC_MIN")); + } + + count++; + } + + assertEquals("Two hourly aggregated row expected ", 2, count); + } + + @Test + public void testAppLevelHostMetricAggregates() throws Exception { + Configuration conf = getConfigurationForTest(false); + conf.set(CLUSTER_AGGREGATOR_APP_IDS, "app1"); + TimelineMetricAggregator agg = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, conf); + TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); + + long startTime = System.currentTimeMillis(); + long ctime = startTime; + long minute = 60 * 1000; + hdb.insertMetricRecords(prepareSingleTimelineMetric((ctime), "local1", + "app1", null, "app_metric_random", 1)); + ctime += 10; + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", + "cpu_user", 1)); + ctime += 10; + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2", + "cpu_user", 2)); + + // WHEN + long endTime = ctime + minute; + boolean success = agg.doWork(startTime, endTime); + + //THEN + Condition condition = new DefaultCondition( + Collections.singletonList("cpu_user"), null, "app1", null, + startTime, endTime, null, null, true); + condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL, + PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA), + METRICS_CLUSTER_AGGREGATE_TABLE_NAME)); + + PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt + (conn, condition); + ResultSet rs = pstmt.executeQuery(); + + int recordCount = 0; + TimelineClusterMetric currentMetric = null; + MetricClusterAggregate currentHostAggregate = null; + while (rs.next()) { + currentMetric = metricReader.fromResultSet(rs); + currentHostAggregate = readHelper.getMetricClusterAggregateFromResultSet(rs); + recordCount++; + } + assertEquals(3, recordCount); + assertNotNull(currentMetric); + assertEquals("cpu_user", currentMetric.getMetricName()); + assertEquals("app1", currentMetric.getAppId()); + assertNotNull(currentHostAggregate); + assertEquals(1, currentHostAggregate.getNumberOfHosts()); + assertEquals(1.0d, currentHostAggregate.getSum()); + } + + @Test + public void testClusterAggregateMetricNormalization() throws Exception { + TimelineMetricAggregator agg = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, getConfigurationForTest(false)); + TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); + + // Sample data + TimelineMetric metric1 = new TimelineMetric(); + metric1.setMetricName("yarn.ClusterMetrics.NumActiveNMs"); + metric1.setAppId("resourcemanager"); + metric1.setHostName("h1"); + metric1.setStartTime(1431372311811l); + metric1.setMetricValues(new TreeMap<Long, Double>() {{ + put(1431372311811l, 1.0); + put(1431372321811l, 1.0); + put(1431372331811l, 1.0); + put(1431372341811l, 1.0); + put(1431372351811l, 1.0); + put(1431372361811l, 1.0); + put(1431372371810l, 1.0); + }}); + + TimelineMetric metric2 = new TimelineMetric(); + metric2.setMetricName("yarn.ClusterMetrics.NumActiveNMs"); + metric2.setAppId("resourcemanager"); + metric2.setHostName("h1"); + metric2.setStartTime(1431372381810l); + metric2.setMetricValues(new TreeMap<Long, Double>() {{ + put(1431372381810l, 1.0); + put(1431372391811l, 1.0); + put(1431372401811l, 1.0); + put(1431372411811l, 1.0); + put(1431372421811l, 1.0); + put(1431372431811l, 1.0); + put(1431372441810l, 1.0); + }}); + + TimelineMetrics metrics = new TimelineMetrics(); + metrics.setMetrics(Collections.singletonList(metric1)); + insertMetricRecords(conn, metrics, 1431372371810l); + + metrics.setMetrics(Collections.singletonList(metric2)); + insertMetricRecords(conn, metrics, 1431372441810l); + + long startTime = 1431372055000l; + long endTime = 1431372655000l; + + agg.doWork(startTime, endTime); + + Condition condition = new DefaultCondition(null, null, null, null, startTime, + endTime, null, null, true); + condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL, + PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA), + METRICS_CLUSTER_AGGREGATE_TABLE_NAME)); + + PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); + ResultSet rs = pstmt.executeQuery(); + + int recordCount = 0; + while (rs.next()) { + TimelineClusterMetric currentMetric = metricReader.fromResultSet(rs); + MetricClusterAggregate currentHostAggregate = readHelper.getMetricClusterAggregateFromResultSet(rs); + + if ("yarn.ClusterMetrics.NumActiveNMs".equals(currentMetric.getMetricName())) { + assertEquals(1, currentHostAggregate.getNumberOfHosts()); + assertEquals(1.0, currentHostAggregate.getMax()); + assertEquals(1.0, currentHostAggregate.getMin()); + assertEquals(1.0, currentHostAggregate.getSum()); + recordCount++; + } else { + fail("Unexpected entry"); + } + } + Assert.assertEquals(5, recordCount); + } + + @Test + public void testAggregationUsingGroupByQuery() throws Exception { + // GIVEN + TimelineMetricAggregator agg = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, getConfigurationForTest(true)); + + long startTime = System.currentTimeMillis(); + long ctime = startTime; + long minute = 60 * 1000; + + Map<TimelineClusterMetric, MetricClusterAggregate> records = + new HashMap<TimelineClusterMetric, MetricClusterAggregate>(); + + records.put(createEmptyTimelineClusterMetric("disk_used", ctime += minute), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(createEmptyTimelineClusterMetric("disk_free", ctime), + new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0)); + + records.put(createEmptyTimelineClusterMetric("disk_used", ctime += minute), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(createEmptyTimelineClusterMetric("disk_free", ctime), + new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0)); + + records.put(createEmptyTimelineClusterMetric("disk_used", ctime += minute), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(createEmptyTimelineClusterMetric("disk_free", ctime), + new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0)); + + records.put(createEmptyTimelineClusterMetric("disk_used", ctime += minute), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(createEmptyTimelineClusterMetric("disk_free", ctime), + new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0)); + + hdb.saveClusterAggregateRecords(records); + + // WHEN + agg.doWork(startTime, ctime + minute); + + // THEN + ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_HOURLY"); + int count = 0; + while (rs.next()) { + if ("disk_used".equals(rs.getString("METRIC_NAME"))) { + assertEquals("APP_ID", "test_app", rs.getString("APP_ID")); + assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM")); + assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT")); + assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX")); + assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN")); + } else if ("disk_free".equals(rs.getString("METRIC_NAME"))) { + assertEquals("APP_ID", "test_app", rs.getString("APP_ID")); + assertEquals("METRIC_SUM", 4.0, rs.getDouble("METRIC_SUM")); + assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT")); + assertEquals("METRIC_MAX", 1.0, rs.getDouble("METRIC_MAX")); + assertEquals("METRIC_MIN", 1.0, rs.getDouble("METRIC_MIN")); + } + count++; + } + assertEquals("Two hourly aggregated row expected ", 2, count); + } + + private ResultSet executeQuery(String query) throws SQLException { + Connection conn = getConnection(getUrl()); + Statement stmt = conn.createStatement(); + return stmt.executeQuery(query); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/2c7aec60/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java new file mode 100644 index 0000000..9c7c8fa --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java @@ -0,0 +1,373 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractMiniHBaseClusterTest; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricReadHelper; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; +import java.util.TreeMap; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertTrue; +import static junit.framework.Assert.fail; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineMetric; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA; +import static org.assertj.core.api.Assertions.assertThat; + +public class ITMetricAggregator extends AbstractMiniHBaseClusterTest { + + @Test + public void testShouldInsertMetrics() throws Exception { + // GIVEN + + // WHEN + long startTime = System.currentTimeMillis(); + TimelineMetrics metricsSent = prepareTimelineMetrics(startTime, "local"); + hdb.insertMetricRecords(metricsSent); + + Condition queryCondition = new DefaultCondition(null, + Collections.singletonList("local"), null, null, startTime, + startTime + (15 * 60 * 1000), null, null, false); + TimelineMetrics recordRead = hdb.getMetricRecords(queryCondition, null); + + // THEN + assertThat(recordRead.getMetrics()).hasSize(2) + .extracting("metricName") + .containsOnly("mem_free", "disk_free"); + + assertThat(metricsSent.getMetrics()) + .usingElementComparator(TIME_IGNORING_COMPARATOR) + .containsExactlyElementsOf(recordRead.getMetrics()); + } + + private Configuration getConfigurationForTest(boolean useGroupByAggregators) { + Configuration configuration = new Configuration(); + configuration.set("timeline.metrics.service.use.groupBy.aggregators", String.valueOf(useGroupByAggregators)); + return configuration; + } + + @Test + public void testShouldAggregateMinuteProperly() throws Exception { + // GIVEN + TimelineMetricAggregator aggregatorMinute = + TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hdb, + getConfigurationForTest(false)); + TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); + + long startTime = System.currentTimeMillis(); + long ctime = startTime; + long minute = 60 * 1000; + hdb.insertMetricRecords(prepareTimelineMetrics(startTime, "local")); + hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local")); + hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local")); + hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local")); + hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local")); + + // WHEN + long endTime = startTime + 1000 * 60 * 4; + boolean success = aggregatorMinute.doWork(startTime, endTime); + + //THEN + Condition condition = new DefaultCondition(null, null, null, null, startTime, + endTime, null, null, true); + condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL, + PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA), + METRICS_AGGREGATE_MINUTE_TABLE_NAME)); + + PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); + ResultSet rs = pstmt.executeQuery(); + MetricHostAggregate expectedAggregate = + MetricTestHelper.createMetricHostAggregate(2.0, 0.0, 20, 15.0); + + int count = 0; + while (rs.next()) { + TimelineMetric currentMetric = + readHelper.getTimelineMetricKeyFromResultSet(rs); + MetricHostAggregate currentHostAggregate = + readHelper.getMetricHostAggregateFromResultSet(rs); + + if ("disk_free".equals(currentMetric.getMetricName())) { + assertEquals(2.0, currentHostAggregate.getMax()); + assertEquals(0.0, currentHostAggregate.getMin()); + assertEquals(20, currentHostAggregate.getNumberOfSamples()); + assertEquals(15.0, currentHostAggregate.getSum()); + assertEquals(15.0 / 20, currentHostAggregate.getAvg()); + count++; + } else if ("mem_free".equals(currentMetric.getMetricName())) { + assertEquals(2.0, currentHostAggregate.getMax()); + assertEquals(0.0, currentHostAggregate.getMin()); + assertEquals(20, currentHostAggregate.getNumberOfSamples()); + assertEquals(15.0, currentHostAggregate.getSum()); + assertEquals(15.0 / 20, currentHostAggregate.getAvg()); + count++; + } else { + fail("Unexpected entry"); + } + } + assertEquals("Two aggregated entries expected", 2, count); + } + + @Test + public void testShouldAggregateHourProperly() throws Exception { + // GIVEN + TimelineMetricAggregator aggregator = + TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hdb, + getConfigurationForTest(false)); + TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); + long startTime = System.currentTimeMillis(); + + MetricHostAggregate expectedAggregate = + MetricTestHelper.createMetricHostAggregate(2.0, 0.0, 20, 15.0); + Map<TimelineMetric, MetricHostAggregate> + aggMap = new HashMap<TimelineMetric, + MetricHostAggregate>(); + + int min_5 = 5 * 60 * 1000; + long ctime = startTime - min_5; + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + + hdb.saveHostAggregateRecords(aggMap, METRICS_AGGREGATE_MINUTE_TABLE_NAME); + + //WHEN + long endTime = ctime + min_5; + boolean success = aggregator.doWork(startTime, endTime); + assertTrue(success); + + //THEN + Condition condition = new DefaultCondition(null, null, null, null, startTime, + endTime, null, null, true); + condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL, + PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA), + METRICS_AGGREGATE_HOURLY_TABLE_NAME)); + + PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); + ResultSet rs = pstmt.executeQuery(); + + while (rs.next()) { + TimelineMetric currentMetric = + readHelper.getTimelineMetricKeyFromResultSet(rs); + MetricHostAggregate currentHostAggregate = + readHelper.getMetricHostAggregateFromResultSet(rs); + + if ("disk_used".equals(currentMetric.getMetricName())) { + assertEquals(2.0, currentHostAggregate.getMax()); + assertEquals(0.0, currentHostAggregate.getMin()); + assertEquals(12 * 20, currentHostAggregate.getNumberOfSamples()); + assertEquals(12 * 15.0, currentHostAggregate.getSum()); + assertEquals(15.0 / 20, currentHostAggregate.getAvg()); + } + } + } + + @Test + public void testMetricAggregateDaily() throws Exception { + // GIVEN + TimelineMetricAggregator aggregator = + TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(hdb, + getConfigurationForTest(false)); + TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); + long startTime = System.currentTimeMillis(); + + MetricHostAggregate expectedAggregate = + MetricTestHelper.createMetricHostAggregate(2.0, 0.0, 20, 15.0); + Map<TimelineMetric, MetricHostAggregate> + aggMap = new HashMap<TimelineMetric, MetricHostAggregate>(); + + int min_5 = 5 * 60 * 1000; + long ctime = startTime - min_5; + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + + hdb.saveHostAggregateRecords(aggMap, METRICS_AGGREGATE_HOURLY_TABLE_NAME); + + //WHEN + long endTime = ctime + min_5; + boolean success = aggregator.doWork(startTime, endTime); + assertTrue(success); + + //THEN + Condition condition = new DefaultCondition(null, null, null, null, startTime, + endTime, null, null, true); + condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL, + PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA), + METRICS_AGGREGATE_DAILY_TABLE_NAME)); + + PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); + ResultSet rs = pstmt.executeQuery(); + + while (rs.next()) { + TimelineMetric currentMetric = + readHelper.getTimelineMetricKeyFromResultSet(rs); + MetricHostAggregate currentHostAggregate = + readHelper.getMetricHostAggregateFromResultSet(rs); + + if ("disk_used".equals(currentMetric.getMetricName())) { + assertEquals(2.0, currentHostAggregate.getMax()); + assertEquals(0.0, currentHostAggregate.getMin()); + assertEquals(12 * 20, currentHostAggregate.getNumberOfSamples()); + assertEquals(12 * 15.0, currentHostAggregate.getSum()); + assertEquals(15.0 / 20, currentHostAggregate.getAvg()); + } + } + } + + @Test + public void testAggregationUsingGroupByQuery() throws Exception { + // GIVEN + TimelineMetricAggregator aggregatorMinute = + TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hdb, + getConfigurationForTest(true)); + TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); + + long startTime = System.currentTimeMillis(); + long ctime = startTime; + long minute = 60 * 1000; + hdb.insertMetricRecords(prepareTimelineMetrics(startTime, "local")); + hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local")); + hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local")); + hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local")); + hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local")); + + long endTime = startTime + 1000 * 60 * 4; + boolean success = aggregatorMinute.doWork(startTime, endTime); + assertTrue(success); + + Condition condition = new DefaultCondition(null, null, null, null, startTime, + endTime, null, null, true); + condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL, + PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA), + METRICS_AGGREGATE_MINUTE_TABLE_NAME)); + + PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); + ResultSet rs = pstmt.executeQuery(); + MetricHostAggregate expectedAggregate = + MetricTestHelper.createMetricHostAggregate(2.0, 0.0, 20, 15.0); + + int count = 0; + while (rs.next()) { + TimelineMetric currentMetric = + readHelper.getTimelineMetricKeyFromResultSet(rs); + MetricHostAggregate currentHostAggregate = + readHelper.getMetricHostAggregateFromResultSet(rs); + + if ("disk_free".equals(currentMetric.getMetricName())) { + assertEquals(2.0, currentHostAggregate.getMax()); + assertEquals(0.0, currentHostAggregate.getMin()); + assertEquals(20, currentHostAggregate.getNumberOfSamples()); + assertEquals(15.0, currentHostAggregate.getSum()); + assertEquals(15.0 / 20, currentHostAggregate.getAvg()); + count++; + } else if ("mem_free".equals(currentMetric.getMetricName())) { + assertEquals(2.0, currentHostAggregate.getMax()); + assertEquals(0.0, currentHostAggregate.getMin()); + assertEquals(20, currentHostAggregate.getNumberOfSamples()); + assertEquals(15.0, currentHostAggregate.getSum()); + assertEquals(15.0 / 20, currentHostAggregate.getAvg()); + count++; + } else { + fail("Unexpected entry"); + } + } + assertEquals("Two aggregated entries expected", 2, count); + } + + private final static Comparator<TimelineMetric> TIME_IGNORING_COMPARATOR = + new Comparator<TimelineMetric>() { + @Override + public int compare(TimelineMetric o1, TimelineMetric o2) { + return o1.equalsExceptTime(o2) ? 0 : 1; + } + }; + + private TimelineMetrics prepareTimelineMetrics(long startTime, String host) { + TimelineMetrics metrics = new TimelineMetrics(); + metrics.setMetrics(Arrays.asList( + createMetric(startTime, "disk_free", host), + createMetric(startTime, "mem_free", host))); + + return metrics; + } + + private TimelineMetric createMetric(long startTime, String metricName, String host) { + TimelineMetric m = new TimelineMetric(); + m.setAppId("host"); + m.setHostName(host); + m.setMetricName(metricName); + m.setStartTime(startTime); + TreeMap<Long, Double> vals = new TreeMap<Long, Double>(); + vals.put(startTime + 15000l, 0.0); + vals.put(startTime + 30000l, 0.0); + vals.put(startTime + 45000l, 1.0); + vals.put(startTime + 60000l, 2.0); + + m.setMetricValues(vals); + + return m; + } + +}