http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/TestRestMetricsSender.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/TestRestMetricsSender.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/TestRestMetricsSender.java new file mode 100644 index 0000000..4411be5 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/TestRestMetricsSender.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .loadsimulator.net; + +import org.junit.Test; + +import java.io.IOException; + +import static org.easymock.EasyMock.*; +import static org.junit.Assert.assertEquals; + +public class TestRestMetricsSender { + + @Test + public void testPushMetrics() throws Exception { + final UrlService svcMock = createStrictMock(UrlService.class); + final String payload = "test"; + final String expectedResponse = "mockResponse"; + + expect(svcMock.send(anyString())).andReturn(expectedResponse); + svcMock.disconnect(); + expectLastCall(); + + replay(svcMock); + + RestMetricsSender sender = new RestMetricsSender("expectedHostName") { + @Override + protected UrlService getConnectedUrlService() throws IOException { + return svcMock; + } + }; + String response = sender.pushMetrics(payload); + + verify(svcMock); + assertEquals("", expectedResponse, response); + } + + @Test + public void testPushMetricsFailed() throws Exception { + final UrlService svcMock = createStrictMock(UrlService.class); + final String payload = "test"; + final String expectedResponse = "mockResponse"; + RestMetricsSender sender = new RestMetricsSender("expectedHostName") { + @Override + protected UrlService getConnectedUrlService() throws IOException { + return svcMock; + } + }; + + expect(svcMock.send(anyString())).andThrow(new IOException()); + svcMock.disconnect(); + expectLastCall(); + + replay(svcMock); + + String response = sender.pushMetrics(payload); + + verify(svcMock); + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/TestStdOutMetricsSender.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/TestStdOutMetricsSender.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/TestStdOutMetricsSender.java new file mode 100644 index 0000000..7e29ae3 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/TestStdOutMetricsSender.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.loadsimulator.net; + + +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; + +public class TestStdOutMetricsSender { + + @Test + public void testPushMetrics() throws Exception { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintStream out = new PrintStream(baos); + StdOutMetricsSender sender = new StdOutMetricsSender("expectedHostName", out); + sender.pushMetrics("test"); + + System.out.println(baos.toString()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/TestRandomMetricsProvider.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/TestRandomMetricsProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/TestRandomMetricsProvider.java new file mode 100644 index 0000000..462aaf0 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/TestRandomMetricsProvider.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.loadsimulator.util; + +import org.junit.Test; + +import static org.junit.Assert.assertTrue; + +public class TestRandomMetricsProvider { + + @Test + public void testReturnSingle() { + double from = 5.25; + double to = 5.40; + RandomMetricsProvider provider = new RandomMetricsProvider(from, to); + double metric = provider.next(); + + assertTrue("Generated metric should be in range", from < metric && metric < to); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/TestTimeStampProvider.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/TestTimeStampProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/TestTimeStampProvider.java new file mode 100644 index 0000000..dd513aa --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/TestTimeStampProvider.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.loadsimulator.util; + +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertEquals; + +public class TestTimeStampProvider { + + @Test + public void testReturnSingle() { + long startTime = 1411663170112L; + int timeStep = 5000; + TimeStampProvider tm = new TimeStampProvider(startTime, timeStep, 0); + + long tStamp = tm.next(); + + assertEquals("First generated timestamp should match starttime", startTime, tStamp); + } + + @Test + public void testReturnTstampsForSendInterval() throws Exception { + long startTime = 0; + int collectInterval = 5; + int sendInterval = 30; + TimeStampProvider tsp = new TimeStampProvider(startTime, collectInterval, sendInterval); + + long[] timestamps = tsp.timestampsForNextInterval(); + + assertThat(timestamps) + .hasSize(6) + .containsOnly(0, 5, 10, 15, 20, 25); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java new file mode 100644 index 0000000..96b8a83 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; + +import com.google.common.collect.Maps; +import org.apache.hadoop.hbase.HConstants; +import org.apache.phoenix.hbase.index.write.IndexWriterUtils; +import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.assertj.core.api.Assertions.assertThat; + +public abstract class AbstractMiniHBaseClusterTest extends BaseTest { + + protected static final long BATCH_SIZE = 3; + + @BeforeClass + public static void doSetup() throws Exception { + Map<String, String> props = getDefaultProps(); + props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(5000)); + props.put(IndexWriterUtils.HTABLE_THREAD_KEY, Integer.toString(100)); + // Make a small batch size to test multiple calls to reserve sequences + props.put(QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB, + Long.toString(BATCH_SIZE)); + // Must update config before starting server + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @AfterClass + public static void doTeardown() throws Exception { + dropNonSystemTables(); + } + + @After + public void cleanUpAfterTest() throws Exception { + deletePriorTables(HConstants.LATEST_TIMESTAMP, getUrl()); + } + + public static Map<String, String> getDefaultProps() { + Map<String, String> props = new HashMap<String, String>(); + // Must update config before starting server + props.put(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB, + Boolean.FALSE.toString()); + props.put("java.security.krb5.realm", ""); + props.put("java.security.krb5.kdc", ""); + return props; + } + + protected Connection getConnection(String url) throws SQLException { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + return conn; + } + + /** + * A canary test. Will show if the infrastructure is set-up correctly. + */ + @Test + public void testClusterOK() throws Exception { + Connection conn = getConnection(getUrl()); + conn.setAutoCommit(true); + + String sampleDDL = "CREATE TABLE TEST_METRICS " + + "(TEST_COLUMN VARCHAR " + + "CONSTRAINT pk PRIMARY KEY (TEST_COLUMN)) " + + "DATA_BLOCK_ENCODING='FAST_DIFF', IMMUTABLE_ROWS=true, " + + "TTL=86400, COMPRESSION='NONE' "; + + Statement stmt = conn.createStatement(); + stmt.executeUpdate(sampleDDL); + conn.commit(); + + ResultSet rs = stmt.executeQuery( + "SELECT COUNT(TEST_COLUMN) FROM TEST_METRICS"); + + rs.next(); + long l = rs.getLong(1); + assertThat(l).isGreaterThanOrEqualTo(0); + + stmt.execute("DROP TABLE TEST_METRICS"); + conn.close(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractPhoenixConnectionlessTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractPhoenixConnectionlessTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractPhoenixConnectionlessTest.java new file mode 100644 index 0000000..1430478 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractPhoenixConnectionlessTest.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; + +import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver; +import org.apache.phoenix.jdbc.PhoenixTestDriver; +import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.TestUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB; +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public abstract class AbstractPhoenixConnectionlessTest extends BaseTest { + + protected static String getUrl() { + return TestUtil.PHOENIX_CONNECTIONLESS_JDBC_URL; + } + + protected static String getUrl(String tenantId) { + return getUrl() + ';' + TENANT_ID_ATTRIB + '=' + tenantId; + } + + protected static PhoenixTestDriver driver; + + private static void startServer(String url) throws Exception { + assertNull(driver); + // only load the test driver if we are testing locally - for integration tests, we want to + // test on a wider scale + if (PhoenixEmbeddedDriver.isTestUrl(url)) { + driver = initDriver(ReadOnlyProps.EMPTY_PROPS); + assertTrue(DriverManager.getDriver(url) == driver); + driver.connect(url, PropertiesUtil.deepCopy(TEST_PROPERTIES)); + } + } + + protected static synchronized PhoenixTestDriver initDriver(ReadOnlyProps props) throws Exception { + if (driver == null) { + driver = new PhoenixTestDriver(props); + DriverManager.registerDriver(driver); + } + return driver; + } + + private String connUrl; + + @Before + public void setup() throws Exception { + connUrl = getUrl(); + startServer(connUrl); + } + + @Test + public void testStorageSystemInitialized() throws Exception { + String sampleDDL = "CREATE TABLE TEST_METRICS (TEST_COLUMN VARCHAR " + + "CONSTRAINT pk PRIMARY KEY (TEST_COLUMN)) DATA_BLOCK_ENCODING='FAST_DIFF', " + + "IMMUTABLE_ROWS=true, TTL=86400, COMPRESSION='SNAPPY'"; + + Connection conn = null; + PreparedStatement stmt = null; + try { + conn = DriverManager.getConnection(connUrl); + stmt = conn.prepareStatement(sampleDDL); + stmt.execute(); + conn.commit(); + } finally { + if (stmt != null) { + stmt.close(); + } + if (conn != null) { + conn.close(); + } + } + } + + @After + public void tearDown() throws Exception { + if (driver != null) { + try { + driver.close(); + } finally { + PhoenixTestDriver phoenixTestDriver = driver; + driver = null; + DriverManager.deregisterDriver(phoenixTestDriver); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java new file mode 100644 index 0000000..f7e53f5 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java @@ -0,0 +1,384 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; + + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.fail; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.LOG; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA; + +public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { + private Connection conn; + private PhoenixHBaseAccessor hdb; + + @Before + public void setUp() throws Exception { + hdb = createTestableHBaseAccessor(); + // inits connection, starts mini cluster + conn = getConnection(getUrl()); + + hdb.initMetricSchema(); + } + + @After + public void tearDown() throws Exception { + Connection conn = getConnection(getUrl()); + Statement stmt = conn.createStatement(); + + stmt.execute("delete from METRIC_AGGREGATE"); + stmt.execute("delete from METRIC_AGGREGATE_HOURLY"); + stmt.execute("delete from METRIC_RECORD"); + stmt.execute("delete from METRIC_RECORD_HOURLY"); + stmt.execute("delete from METRIC_RECORD_MINUTE"); + conn.commit(); + + stmt.close(); + conn.close(); + } + + @Test + public void testShouldAggregateClusterProperly() throws Exception { + // GIVEN + TimelineMetricClusterAggregator agg = + new TimelineMetricClusterAggregator(hdb, new Configuration()); + + long startTime = System.currentTimeMillis(); + long ctime = startTime; + long minute = 60 * 1000; + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", + "disk_free", 1)); + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2", + "disk_free", 2)); + ctime += minute; + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", + "disk_free", 2)); + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2", + "disk_free", 1)); + + // WHEN + long endTime = ctime + minute; + boolean success = agg.doWork(startTime, endTime); + + //THEN + Condition condition = new Condition(null, null, null, null, startTime, + endTime, null, true); + condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL, + PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA))); + + PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt + (conn, condition); + ResultSet rs = pstmt.executeQuery(); + + int recordCount = 0; + while (rs.next()) { + TimelineClusterMetric currentMetric = + PhoenixHBaseAccessor.getTimelineMetricClusterKeyFromResultSet(rs); + MetricClusterAggregate currentHostAggregate = + PhoenixHBaseAccessor.getMetricClusterAggregateFromResultSet(rs); + + if ("disk_free".equals(currentMetric.getMetricName())) { + assertEquals(2, currentHostAggregate.getNumberOfHosts()); + assertEquals(2.0, currentHostAggregate.getMax()); + assertEquals(1.0, currentHostAggregate.getMin()); + assertEquals(3.0, currentHostAggregate.getSum()); + recordCount++; + } else { + fail("Unexpected entry"); + } + } + } + + + @Test + public void testShouldAggregateDifferentMetricsOnClusterProperly() + throws Exception { + // GIVEN + TimelineMetricClusterAggregator agg = + new TimelineMetricClusterAggregator(hdb, new Configuration()); + + // here we put some metrics tha will be aggregated + long startTime = System.currentTimeMillis(); + long ctime = startTime; + long minute = 60 * 1000; + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", + "disk_free", 1)); + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2", + "disk_free", 2)); + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", + "disk_used", 1)); + + ctime += minute; + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", + "disk_free", 2)); + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2", + "disk_free", 1)); + hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", + "disk_used", 1)); + + // WHEN + long endTime = ctime + minute; + boolean success = agg.doWork(startTime, endTime); + + //THEN + Condition condition = new Condition(null, null, null, null, startTime, + endTime, null, true); + condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL, + PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA))); + + PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt + (conn, condition); + ResultSet rs = pstmt.executeQuery(); + + int recordCount = 0; + while (rs.next()) { + TimelineClusterMetric currentMetric = + PhoenixHBaseAccessor.getTimelineMetricClusterKeyFromResultSet(rs); + MetricClusterAggregate currentHostAggregate = + PhoenixHBaseAccessor.getMetricClusterAggregateFromResultSet(rs); + + if ("disk_free".equals(currentMetric.getMetricName())) { + assertEquals(2, currentHostAggregate.getNumberOfHosts()); + assertEquals(2.0, currentHostAggregate.getMax()); + assertEquals(1.0, currentHostAggregate.getMin()); + assertEquals(3.0, currentHostAggregate.getSum()); + recordCount++; + } else if ("disk_used".equals(currentMetric.getMetricName())) { + assertEquals(1, currentHostAggregate.getNumberOfHosts()); + assertEquals(1.0, currentHostAggregate.getMax()); + assertEquals(1.0, currentHostAggregate.getMin()); + assertEquals(1.0, currentHostAggregate.getSum()); + recordCount++; + } else { + fail("Unexpected entry"); + } + } + } + + + @Test + public void testShouldAggregateClusterOnHourProperly() throws Exception { + // GIVEN + TimelineMetricClusterAggregatorHourly agg = + new TimelineMetricClusterAggregatorHourly(hdb, new Configuration()); + + // this time can be virtualized! or made independent from real clock + long startTime = System.currentTimeMillis(); + long ctime = startTime; + long minute = 60 * 1000; + + Map<TimelineClusterMetric, MetricClusterAggregate> records = + new HashMap<TimelineClusterMetric, MetricClusterAggregate>(); + + records.put(createEmptyTimelineMetric(ctime), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(createEmptyTimelineMetric(ctime += minute), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(createEmptyTimelineMetric(ctime += minute), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(createEmptyTimelineMetric(ctime += minute), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + + hdb.saveClusterAggregateRecords(records); + + // WHEN + agg.doWork(startTime, ctime + minute); + + // THEN + ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_HOURLY"); + int count = 0; + while (rs.next()) { + assertEquals("METRIC_NAME", "disk_used", rs.getString("METRIC_NAME")); + assertEquals("APP_ID", "test_app", rs.getString("APP_ID")); + assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM")); + assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT")); + assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX")); + assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN")); + count++; + } + + assertEquals("One hourly aggregated row expected ", 1, count); + } + + @Test + public void testShouldAggregateDifferentMetricsOnHourProperly() throws + Exception { + // GIVEN + TimelineMetricClusterAggregatorHourly agg = + new TimelineMetricClusterAggregatorHourly(hdb, new Configuration()); + + long startTime = System.currentTimeMillis(); + long ctime = startTime; + long minute = 60 * 1000; + + Map<TimelineClusterMetric, MetricClusterAggregate> records = + new HashMap<TimelineClusterMetric, MetricClusterAggregate>(); + + records.put(createEmptyTimelineMetric("disk_used", ctime), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(createEmptyTimelineMetric("disk_free", ctime), + new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0)); + + records.put(createEmptyTimelineMetric("disk_used", ctime += minute), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(createEmptyTimelineMetric("disk_free", ctime), + new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0)); + + records.put(createEmptyTimelineMetric("disk_used", ctime += minute), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(createEmptyTimelineMetric("disk_free", ctime), + new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0)); + + records.put(createEmptyTimelineMetric("disk_used", ctime += minute), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(createEmptyTimelineMetric("disk_free", ctime), + new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0)); + + hdb.saveClusterAggregateRecords(records); + + // WHEN + agg.doWork(startTime, ctime + minute); + + // THEN + ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_HOURLY"); + int count = 0; + while (rs.next()) { + if ("disk_used".equals(rs.getString("METRIC_NAME"))) { + assertEquals("APP_ID", "test_app", rs.getString("APP_ID")); + assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM")); + assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT")); + assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX")); + assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN")); + } else if ("disk_free".equals(rs.getString("METRIC_NAME"))) { + assertEquals("APP_ID", "test_app", rs.getString("APP_ID")); + assertEquals("METRIC_SUM", 4.0, rs.getDouble("METRIC_SUM")); + assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT")); + assertEquals("METRIC_MAX", 1.0, rs.getDouble("METRIC_MAX")); + assertEquals("METRIC_MIN", 1.0, rs.getDouble("METRIC_MIN")); + } + + count++; + } + + assertEquals("Two hourly aggregated row expected ", 2, count); + } + + private ResultSet executeQuery(String query) throws SQLException { + Connection conn = getConnection(getUrl()); + Statement stmt = conn.createStatement(); + return stmt.executeQuery(query); + } + + private TimelineClusterMetric createEmptyTimelineMetric(String name, + long startTime) { + TimelineClusterMetric metric = new TimelineClusterMetric(name, + "test_app", null, startTime, null); + + return metric; + } + + private TimelineClusterMetric createEmptyTimelineMetric(long startTime) { + return createEmptyTimelineMetric("disk_used", startTime); + } + + private MetricHostAggregate + createMetricHostAggregate(double max, double min, int numberOfSamples, + double sum) { + MetricHostAggregate expectedAggregate = + new MetricHostAggregate(); + expectedAggregate.setMax(max); + expectedAggregate.setMin(min); + expectedAggregate.setNumberOfSamples(numberOfSamples); + expectedAggregate.setSum(sum); + + return expectedAggregate; + } + + private PhoenixHBaseAccessor createTestableHBaseAccessor() { + Configuration metricsConf = new Configuration(); + metricsConf.set( + TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME, "NONE"); + + return + new PhoenixHBaseAccessor( + new Configuration(), + metricsConf, + new ConnectionProvider() { + @Override + public Connection getConnection() { + Connection connection = null; + try { + connection = DriverManager.getConnection(getUrl()); + } catch (SQLException e) { + LOG.warn("Unable to connect to HBase store using Phoenix.", e); + } + return connection; + } + }); + } + + private TimelineMetrics prepareSingleTimelineMetric(long startTime, + String host, + String metricName, + double val) { + TimelineMetrics m = new TimelineMetrics(); + m.setMetrics(Arrays.asList( + createTimelineMetric(startTime, metricName, host, val))); + + return m; + } + + private TimelineMetric createTimelineMetric(long startTime, + String metricName, + String host, + double val) { + TimelineMetric m = new TimelineMetric(); + m.setAppId("host"); + m.setHostName(host); + m.setMetricName(metricName); + m.setStartTime(startTime); + Map<Long, Double> vals = new HashMap<Long, Double>(); + vals.put(startTime + 15000l, val); + vals.put(startTime + 30000l, val); + vals.put(startTime + 45000l, val); + vals.put(startTime + 60000l, val); + + m.setMetricValues(vals); + + return m; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java new file mode 100644 index 0000000..d166a22 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java @@ -0,0 +1,310 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertTrue; +import static junit.framework.Assert.fail; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.LOG; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA; +import static org.assertj.core.api.Assertions.assertThat; + +public class ITMetricAggregator extends AbstractMiniHBaseClusterTest { + private Connection conn; + private PhoenixHBaseAccessor hdb; + + @Before + public void setUp() throws Exception { + hdb = createTestableHBaseAccessor(); + // inits connection, starts mini cluster + conn = getConnection(getUrl()); + + hdb.initMetricSchema(); + } + + @After + public void tearDown() throws Exception { + Connection conn = getConnection(getUrl()); + Statement stmt = conn.createStatement(); + + stmt.execute("delete from METRIC_AGGREGATE"); + stmt.execute("delete from METRIC_AGGREGATE_HOURLY"); + stmt.execute("delete from METRIC_RECORD"); + stmt.execute("delete from METRIC_RECORD_HOURLY"); + stmt.execute("delete from METRIC_RECORD_MINUTE"); + conn.commit(); + + stmt.close(); + conn.close(); + } + + @Test + public void testShouldInsertMetrics() throws Exception { + // GIVEN + + // WHEN + long startTime = System.currentTimeMillis(); + TimelineMetrics metricsSent = prepareTimelineMetrics(startTime, "local"); + hdb.insertMetricRecords(metricsSent); + + Condition queryCondition = new Condition(null, "local", null, null, + startTime, startTime + (15 * 60 * 1000), null, false); + TimelineMetrics recordRead = hdb.getMetricRecords(queryCondition); + + // THEN + assertThat(recordRead.getMetrics()).hasSize(2) + .extracting("metricName") + .containsOnly("mem_free", "disk_free"); + + assertThat(metricsSent.getMetrics()) + .usingElementComparator(TIME_IGNORING_COMPARATOR) + .containsExactlyElementsOf(recordRead.getMetrics()); + } + + @Test + public void testShouldAggregateMinuteProperly() throws Exception { + // GIVEN +// TimelineMetricAggregatorMinute aggregatorMinute = +// new TimelineMetricAggregatorMinute(hdb, new Configuration()); + TimelineMetricAggregator aggregatorMinute = TimelineMetricAggregatorFactory + .createTimelineMetricAggregatorMinute(hdb, new Configuration()); + + long startTime = System.currentTimeMillis(); + long ctime = startTime; + long minute = 60 * 1000; + hdb.insertMetricRecords(prepareTimelineMetrics(startTime, "local")); + hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local")); + hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local")); + hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local")); + hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local")); + + // WHEN + long endTime = startTime + 1000 * 60 * 4; + boolean success = aggregatorMinute.doWork(startTime, endTime); + + //THEN + Condition condition = new Condition(null, null, null, null, startTime, + endTime, null, true); + condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL, + PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA), + METRICS_AGGREGATE_MINUTE_TABLE_NAME)); + + PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt + (conn, condition); + ResultSet rs = pstmt.executeQuery(); + MetricHostAggregate expectedAggregate = + createMetricHostAggregate(2.0, 0.0, 20, 15.0); + + int count = 0; + while (rs.next()) { + TimelineMetric currentMetric = + PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs); + MetricHostAggregate currentHostAggregate = + PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs); + + if ("disk_free".equals(currentMetric.getMetricName())) { + assertEquals(2.0, currentHostAggregate.getMax()); + assertEquals(0.0, currentHostAggregate.getMin()); + assertEquals(20, currentHostAggregate.getNumberOfSamples()); + assertEquals(15.0, currentHostAggregate.getSum()); + assertEquals(15.0 / 20, currentHostAggregate.getAvg()); + count++; + } else if ("mem_free".equals(currentMetric.getMetricName())) { + assertEquals(2.0, currentHostAggregate.getMax()); + assertEquals(0.0, currentHostAggregate.getMin()); + assertEquals(20, currentHostAggregate.getNumberOfSamples()); + assertEquals(15.0, currentHostAggregate.getSum()); + assertEquals(15.0 / 20, currentHostAggregate.getAvg()); + count++; + } else { + fail("Unexpected entry"); + } + } + assertEquals("Two aggregated entries expected", 2, count); + } + + @Test + public void testShouldAggregateHourProperly() throws Exception { + // GIVEN +// TimelineMetricAggregatorHourly aggregator = +// new TimelineMetricAggregatorHourly(hdb, new Configuration()); + + TimelineMetricAggregator aggregator = TimelineMetricAggregatorFactory + .createTimelineMetricAggregatorHourly(hdb, new Configuration()); + long startTime = System.currentTimeMillis(); + + MetricHostAggregate expectedAggregate = + createMetricHostAggregate(2.0, 0.0, 20, 15.0); + Map<TimelineMetric, MetricHostAggregate> + aggMap = new HashMap<TimelineMetric, + MetricHostAggregate>(); + + int min_5 = 5 * 60 * 1000; + long ctime = startTime - min_5; + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + + hdb.saveHostAggregateRecords(aggMap, METRICS_AGGREGATE_MINUTE_TABLE_NAME); + + //WHEN + long endTime = ctime + min_5; + boolean success = aggregator.doWork(startTime, endTime); + assertTrue(success); + + //THEN + Condition condition = new Condition(null, null, null, null, startTime, + endTime, null, true); + condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL, + PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA), + METRICS_AGGREGATE_HOURLY_TABLE_NAME)); + + PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt + (conn, condition); + ResultSet rs = pstmt.executeQuery(); + + while (rs.next()) { + TimelineMetric currentMetric = + PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs); + MetricHostAggregate currentHostAggregate = + PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs); + + if ("disk_used".equals(currentMetric.getMetricName())) { + assertEquals(2.0, currentHostAggregate.getMax()); + assertEquals(0.0, currentHostAggregate.getMin()); + assertEquals(12 * 20, currentHostAggregate.getNumberOfSamples()); + assertEquals(12 * 15.0, currentHostAggregate.getSum()); + assertEquals(15.0 / 20, currentHostAggregate.getAvg()); + } + } + } + + private TimelineMetric createEmptyTimelineMetric(long startTime) { + TimelineMetric metric = new TimelineMetric(); + metric.setMetricName("disk_used"); + metric.setAppId("test_app"); + metric.setHostName("test_host"); + metric.setTimestamp(startTime); + + return metric; + } + + private MetricHostAggregate + createMetricHostAggregate(double max, double min, int numberOfSamples, + double sum) { + MetricHostAggregate expectedAggregate = + new MetricHostAggregate(); + expectedAggregate.setMax(max); + expectedAggregate.setMin(min); + expectedAggregate.setNumberOfSamples(numberOfSamples); + expectedAggregate.setSum(sum); + + return expectedAggregate; + } + + private PhoenixHBaseAccessor createTestableHBaseAccessor() { + Configuration metricsConf = new Configuration(); + metricsConf.set( + TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME, "NONE"); + + return + new PhoenixHBaseAccessor( + new Configuration(), + metricsConf, + new ConnectionProvider() { + @Override + public Connection getConnection() { + Connection connection = null; + try { + connection = DriverManager.getConnection(getUrl()); + } catch (SQLException e) { + LOG.warn("Unable to connect to HBase store using Phoenix.", e); + } + return connection; + } + }); + } + + private final static Comparator<TimelineMetric> TIME_IGNORING_COMPARATOR = + new Comparator<TimelineMetric>() { + @Override + public int compare(TimelineMetric o1, TimelineMetric o2) { + return o1.equalsExceptTime(o2) ? 0 : 1; + } + }; + + private TimelineMetrics prepareTimelineMetrics(long startTime, String host) { + TimelineMetrics metrics = new TimelineMetrics(); + metrics.setMetrics(Arrays.asList( + createMetric(startTime, "disk_free", host), + createMetric(startTime, "mem_free", host))); + + return metrics; + } + + private TimelineMetric createMetric(long startTime, + String metricName, + String host) { + TimelineMetric m = new TimelineMetric(); + m.setAppId("host"); + m.setHostName(host); + m.setMetricName(metricName); + m.setStartTime(startTime); + Map<Long, Double> vals = new HashMap<Long, Double>(); + vals.put(startTime + 15000l, 0.0); + vals.put(startTime + 30000l, 0.0); + vals.put(startTime + 45000l, 1.0); + vals.put(startTime + 60000l, 2.0); + + m.setMetricValues(vals); + + return m; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestClusterSuite.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestClusterSuite.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestClusterSuite.java new file mode 100644 index 0000000..0722ccd --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestClusterSuite.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; + + +import org.junit.runner.RunWith; +import org.junit.runners.Suite; + +import static org.junit.runners.Suite.SuiteClasses; + +@RunWith(Suite.class) +@SuiteClasses({ITMetricAggregator.class, ITClusterAggregator.class}) +public class TestClusterSuite { + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java new file mode 100644 index 0000000..5d8ba96 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .timeline; + +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class TestMetricHostAggregate { + + @Test + public void testCreateAggregate() throws Exception { + // given + MetricHostAggregate aggregate = createAggregate(3.0, 1.0, 2.0, 2); + + //then + assertThat(aggregate.getSum()).isEqualTo(3.0); + assertThat(aggregate.getMin()).isEqualTo(1.0); + assertThat(aggregate.getMax()).isEqualTo(2.0); + assertThat(aggregate.getAvg()).isEqualTo(3.0 / 2); + } + + @Test + public void testUpdateAggregates() throws Exception { + // given + MetricHostAggregate aggregate = createAggregate(3.0, 1.0, 2.0, 2); + + //when + aggregate.updateAggregates(createAggregate(8.0, 0.5, 7.5, 2)); + aggregate.updateAggregates(createAggregate(1.0, 1.0, 1.0, 1)); + + //then + assertThat(aggregate.getSum()).isEqualTo(12.0); + assertThat(aggregate.getMin()).isEqualTo(0.5); + assertThat(aggregate.getMax()).isEqualTo(7.5); + assertThat(aggregate.getAvg()).isEqualTo((3.0 + 8.0 + 1.0) / 5); + } + + private MetricHostAggregate createAggregate + (double sum, double min, double max, int samplesCount) { + MetricHostAggregate aggregate = new MetricHostAggregate(); + aggregate.setSum(sum); + aggregate.setMax(max); + aggregate.setMin(min); + aggregate.setDeviation(0.0); + aggregate.setNumberOfSamples(samplesCount); + return aggregate; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestPhoenixTransactSQL.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestPhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestPhoenixTransactSQL.java new file mode 100644 index 0000000..758f5a9 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestPhoenixTransactSQL.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; + +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition; + +public class TestPhoenixTransactSQL { + @Test + public void testConditionClause() throws Exception { + Condition condition = new Condition( + Arrays.asList("cpu_user", "mem_free"), "h1", "a1", "i1", + 1407959718L, 1407959918L, null, false); + + String preparedClause = condition.getConditionClause(); + String expectedClause = "METRIC_NAME IN (?, ?) AND HOSTNAME = ? AND " + + "APP_ID = ? AND INSTANCE_ID = ? AND SERVER_TIME >= ? AND SERVER_TIME < ?"; + + Assert.assertNotNull(preparedClause); + Assert.assertEquals(expectedClause, preparedClause); + } + + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java new file mode 100644 index 0000000..c893314 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; + +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import java.io.IOException; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +public class TestTimelineMetricStore implements TimelineMetricStore { + @Override + public TimelineMetrics getTimelineMetrics(List<String> metricNames, + String hostname, String applicationId, String instanceId, Long startTime, + Long endTime, Integer limit, boolean groupedByHost) throws SQLException, + IOException { + TimelineMetrics timelineMetrics = new TimelineMetrics(); + List<TimelineMetric> metricList = new ArrayList<TimelineMetric>(); + timelineMetrics.setMetrics(metricList); + TimelineMetric metric1 = new TimelineMetric(); + TimelineMetric metric2 = new TimelineMetric(); + metricList.add(metric1); + metricList.add(metric2); + metric1.setMetricName("cpu_user"); + metric1.setAppId("1"); + metric1.setInstanceId(null); + metric1.setHostName("c6401"); + metric1.setStartTime(1407949812L); + metric1.setMetricValues(new HashMap<Long, Double>() {{ + put(1407949812L, 1.0d); + put(1407949912L, 1.8d); + put(1407950002L, 0.7d); + }}); + + metric2.setMetricName("mem_free"); + metric2.setAppId("2"); + metric2.setInstanceId("3"); + metric2.setHostName("c6401"); + metric2.setStartTime(1407949812L); + metric2.setMetricValues(new HashMap<Long, Double>() {{ + put(1407949812L, 2.5d); + put(1407949912L, 3.0d); + put(1407950002L, 0.9d); + }}); + + return timelineMetrics; + } + + @Override + public TimelineMetric getTimelineMetric(String metricName, String hostname, + String applicationId, String instanceId, Long startTime, Long endTime, + Integer limit) throws SQLException, IOException { + + return null; + } + + @Override + public TimelinePutResponse putMetrics(TimelineMetrics metrics) + throws SQLException, IOException { + + return new TimelinePutResponse(); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestGenericObjectMapper.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestGenericObjectMapper.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestGenericObjectMapper.java new file mode 100644 index 0000000..d684a27 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestGenericObjectMapper.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.GenericObjectMapper; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class TestGenericObjectMapper { + + @Test + public void testEncoding() { + testEncoding(Long.MAX_VALUE); + testEncoding(Long.MIN_VALUE); + testEncoding(0l); + testEncoding(128l); + testEncoding(256l); + testEncoding(512l); + testEncoding(-256l); + } + + private static void testEncoding(long l) { + byte[] b = GenericObjectMapper.writeReverseOrderedLong(l); + assertEquals("error decoding", l, + GenericObjectMapper.readReverseOrderedLong(b, 0)); + byte[] buf = new byte[16]; + System.arraycopy(b, 0, buf, 5, 8); + assertEquals("error decoding at offset", l, + GenericObjectMapper.readReverseOrderedLong(buf, 5)); + if (l > Long.MIN_VALUE) { + byte[] a = GenericObjectMapper.writeReverseOrderedLong(l-1); + assertEquals("error preserving ordering", 1, + WritableComparator.compareBytes(a, 0, a.length, b, 0, b.length)); + } + if (l < Long.MAX_VALUE) { + byte[] c = GenericObjectMapper.writeReverseOrderedLong(l+1); + assertEquals("error preserving ordering", 1, + WritableComparator.compareBytes(b, 0, b.length, c, 0, c.length)); + } + } + + private static void verify(Object o) throws IOException { + assertEquals(o, GenericObjectMapper.read(GenericObjectMapper.write(o))); + } + + @Test + public void testValueTypes() throws IOException { + verify(Integer.MAX_VALUE); + verify(Integer.MIN_VALUE); + assertEquals(Integer.MAX_VALUE, GenericObjectMapper.read( + GenericObjectMapper.write((long) Integer.MAX_VALUE))); + assertEquals(Integer.MIN_VALUE, GenericObjectMapper.read( + GenericObjectMapper.write((long) Integer.MIN_VALUE))); + verify((long)Integer.MAX_VALUE + 1l); + verify((long)Integer.MIN_VALUE - 1l); + + verify(Long.MAX_VALUE); + verify(Long.MIN_VALUE); + + assertEquals(42, GenericObjectMapper.read(GenericObjectMapper.write(42l))); + verify(42); + verify(1.23); + verify("abc"); + verify(true); + List<String> list = new ArrayList<String>(); + list.add("123"); + list.add("abc"); + verify(list); + Map<String,String> map = new HashMap<String,String>(); + map.put("k1","v1"); + map.put("k2","v2"); + verify(map); + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestLeveldbTimelineStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestLeveldbTimelineStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestLeveldbTimelineStore.java new file mode 100644 index 0000000..9b27309 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestLeveldbTimelineStore.java @@ -0,0 +1,253 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.iq80.leveldb.DBIterator; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.GenericObjectMapper.writeReverseOrderedLong; +import static org.junit.Assert.assertEquals; + +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class TestLeveldbTimelineStore extends TimelineStoreTestUtils { + private FileContext fsContext; + private File fsPath; + + @Before + public void setup() throws Exception { + fsContext = FileContext.getLocalFSFileContext(); + Configuration conf = new Configuration(); + fsPath = new File("target", this.getClass().getSimpleName() + + "-tmpDir").getAbsoluteFile(); + fsContext.delete(new Path(fsPath.getAbsolutePath()), true); + conf.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH, + fsPath.getAbsolutePath()); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, false); + store = new LeveldbTimelineStore(); + store.init(conf); + store.start(); + loadTestData(); + loadVerificationData(); + } + + @After + public void tearDown() throws Exception { + store.stop(); + fsContext.delete(new Path(fsPath.getAbsolutePath()), true); + } + + @Test + public void testGetSingleEntity() throws IOException { + super.testGetSingleEntity(); + ((LeveldbTimelineStore)store).clearStartTimeCache(); + super.testGetSingleEntity(); + loadTestData(); + } + + @Test + public void testGetEntities() throws IOException { + super.testGetEntities(); + } + + @Test + public void testGetEntitiesWithFromId() throws IOException { + super.testGetEntitiesWithFromId(); + } + + @Test + public void testGetEntitiesWithFromTs() throws IOException { + super.testGetEntitiesWithFromTs(); + } + + @Test + public void testGetEntitiesWithPrimaryFilters() throws IOException { + super.testGetEntitiesWithPrimaryFilters(); + } + + @Test + public void testGetEntitiesWithSecondaryFilters() throws IOException { + super.testGetEntitiesWithSecondaryFilters(); + } + + @Test + public void testGetEvents() throws IOException { + super.testGetEvents(); + } + + @Test + public void testCacheSizes() { + Configuration conf = new Configuration(); + assertEquals(10000, LeveldbTimelineStore.getStartTimeReadCacheSize(conf)); + assertEquals(10000, LeveldbTimelineStore.getStartTimeWriteCacheSize(conf)); + conf.setInt( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE, + 10001); + assertEquals(10001, LeveldbTimelineStore.getStartTimeReadCacheSize(conf)); + conf = new Configuration(); + conf.setInt( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE, + 10002); + assertEquals(10002, LeveldbTimelineStore.getStartTimeWriteCacheSize(conf)); + } + + private boolean deleteNextEntity(String entityType, byte[] ts) + throws IOException, InterruptedException { + DBIterator iterator = null; + DBIterator pfIterator = null; + try { + iterator = ((LeveldbTimelineStore)store).getDbIterator(false); + pfIterator = ((LeveldbTimelineStore)store).getDbIterator(false); + return ((LeveldbTimelineStore)store).deleteNextEntity(entityType, ts, + iterator, pfIterator, false); + } finally { + IOUtils.cleanup(null, iterator, pfIterator); + } + } + + @Test + public void testGetEntityTypes() throws IOException { + List<String> entityTypes = ((LeveldbTimelineStore)store).getEntityTypes(); + assertEquals(4, entityTypes.size()); + assertEquals(entityType1, entityTypes.get(0)); + assertEquals(entityType2, entityTypes.get(1)); + assertEquals(entityType4, entityTypes.get(2)); + assertEquals(entityType5, entityTypes.get(3)); + } + + @Test + public void testDeleteEntities() throws IOException, InterruptedException { + assertEquals(2, getEntities("type_1").size()); + assertEquals(1, getEntities("type_2").size()); + + assertEquals(false, deleteNextEntity(entityType1, + writeReverseOrderedLong(122l))); + assertEquals(2, getEntities("type_1").size()); + assertEquals(1, getEntities("type_2").size()); + + assertEquals(true, deleteNextEntity(entityType1, + writeReverseOrderedLong(123l))); + List<TimelineEntity> entities = getEntities("type_2"); + assertEquals(1, entities.size()); + verifyEntityInfo(entityId2, entityType2, events2, Collections.singletonMap( + entityType1, Collections.singleton(entityId1b)), EMPTY_PRIMARY_FILTERS, + EMPTY_MAP, entities.get(0)); + entities = getEntitiesWithPrimaryFilter("type_1", userFilter); + assertEquals(1, entities.size()); + verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES, + primaryFilters, otherInfo, entities.get(0)); + + ((LeveldbTimelineStore)store).discardOldEntities(-123l); + assertEquals(1, getEntities("type_1").size()); + assertEquals(0, getEntities("type_2").size()); + assertEquals(3, ((LeveldbTimelineStore)store).getEntityTypes().size()); + + ((LeveldbTimelineStore)store).discardOldEntities(123l); + assertEquals(0, getEntities("type_1").size()); + assertEquals(0, getEntities("type_2").size()); + assertEquals(0, ((LeveldbTimelineStore)store).getEntityTypes().size()); + assertEquals(0, getEntitiesWithPrimaryFilter("type_1", userFilter).size()); + } + + @Test + public void testDeleteEntitiesPrimaryFilters() + throws IOException, InterruptedException { + Map<String, Set<Object>> primaryFilter = + Collections.singletonMap("user", Collections.singleton( + (Object) "otheruser")); + TimelineEntities atsEntities = new TimelineEntities(); + atsEntities.setEntities(Collections.singletonList(createEntity(entityId1b, + entityType1, 789l, Collections.singletonList(ev2), null, primaryFilter, + null))); + TimelinePutResponse response = store.put(atsEntities); + assertEquals(0, response.getErrors().size()); + + NameValuePair pfPair = new NameValuePair("user", "otheruser"); + List<TimelineEntity> entities = getEntitiesWithPrimaryFilter("type_1", + pfPair); + assertEquals(1, entities.size()); + verifyEntityInfo(entityId1b, entityType1, Collections.singletonList(ev2), + EMPTY_REL_ENTITIES, primaryFilter, EMPTY_MAP, entities.get(0)); + + entities = getEntitiesWithPrimaryFilter("type_1", userFilter); + assertEquals(2, entities.size()); + verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES, + primaryFilters, otherInfo, entities.get(0)); + verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES, + primaryFilters, otherInfo, entities.get(1)); + + ((LeveldbTimelineStore)store).discardOldEntities(-123l); + assertEquals(1, getEntitiesWithPrimaryFilter("type_1", pfPair).size()); + assertEquals(2, getEntitiesWithPrimaryFilter("type_1", userFilter).size()); + + ((LeveldbTimelineStore)store).discardOldEntities(123l); + assertEquals(0, getEntities("type_1").size()); + assertEquals(0, getEntities("type_2").size()); + assertEquals(0, ((LeveldbTimelineStore)store).getEntityTypes().size()); + + assertEquals(0, getEntitiesWithPrimaryFilter("type_1", pfPair).size()); + assertEquals(0, getEntitiesWithPrimaryFilter("type_1", userFilter).size()); + } + + @Test + public void testFromTsWithDeletion() + throws IOException, InterruptedException { + long l = System.currentTimeMillis(); + assertEquals(2, getEntitiesFromTs("type_1", l).size()); + assertEquals(1, getEntitiesFromTs("type_2", l).size()); + assertEquals(2, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter, + l).size()); + ((LeveldbTimelineStore)store).discardOldEntities(123l); + assertEquals(0, getEntitiesFromTs("type_1", l).size()); + assertEquals(0, getEntitiesFromTs("type_2", l).size()); + assertEquals(0, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter, + l).size()); + assertEquals(0, getEntities("type_1").size()); + assertEquals(0, getEntities("type_2").size()); + assertEquals(0, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter, + l).size()); + loadTestData(); + assertEquals(0, getEntitiesFromTs("type_1", l).size()); + assertEquals(0, getEntitiesFromTs("type_2", l).size()); + assertEquals(0, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter, + l).size()); + assertEquals(2, getEntities("type_1").size()); + assertEquals(1, getEntities("type_2").size()); + assertEquals(2, getEntitiesWithPrimaryFilter("type_1", userFilter).size()); + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestMemoryTimelineStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestMemoryTimelineStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestMemoryTimelineStore.java new file mode 100644 index 0000000..415de53 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestMemoryTimelineStore.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline; + +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +public class TestMemoryTimelineStore extends TimelineStoreTestUtils { + + @Before + public void setup() throws Exception { + store = new MemoryTimelineStore(); + store.init(new YarnConfiguration()); + store.start(); + loadTestData(); + loadVerificationData(); + } + + @After + public void tearDown() throws Exception { + store.stop(); + } + + public TimelineStore getTimelineStore() { + return store; + } + + @Test + public void testGetSingleEntity() throws IOException { + super.testGetSingleEntity(); + } + + @Test + public void testGetEntities() throws IOException { + super.testGetEntities(); + } + + @Test + public void testGetEntitiesWithFromId() throws IOException { + super.testGetEntitiesWithFromId(); + } + + @Test + public void testGetEntitiesWithFromTs() throws IOException { + super.testGetEntitiesWithFromTs(); + } + + @Test + public void testGetEntitiesWithPrimaryFilters() throws IOException { + super.testGetEntitiesWithPrimaryFilters(); + } + + @Test + public void testGetEntitiesWithSecondaryFilters() throws IOException { + super.testGetEntitiesWithSecondaryFilters(); + } + + @Test + public void testGetEvents() throws IOException { + super.testGetEvents(); + } + +}