Repository: storm Updated Branches: refs/heads/master 879cb5b8f -> e6a423dd8
http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/RocksDbStoreTest.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/RocksDbStoreTest.java b/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/RocksDbStoreTest.java new file mode 100644 index 0000000..63df80a --- /dev/null +++ b/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/RocksDbStoreTest.java @@ -0,0 +1,328 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.storm.metricstore.rocksdb; + +import org.apache.commons.io.FileUtils; +import org.apache.storm.DaemonConfig; +import org.apache.storm.metricstore.AggLevel; +import org.apache.storm.metricstore.FilterOptions; +import org.apache.storm.metricstore.Metric; +import org.apache.storm.metricstore.MetricException; +import org.apache.storm.metricstore.MetricStore; +import org.apache.storm.metricstore.MetricStoreConfig; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class RocksDbStoreTest { + private final static Logger LOG = LoggerFactory.getLogger(RocksDbStoreTest.class); + static MetricStore store; + static Path tempDirForTest; + + @BeforeClass + public static void setUp() throws MetricException, IOException { + // remove any previously created cache instance + StringMetadataCache.cleanUp(); + tempDirForTest = Files.createTempDirectory("RocksDbStoreTest"); + Map<String, Object> conf = new HashMap<>(); + conf.put(DaemonConfig.STORM_METRIC_STORE_CLASS, "org.apache.storm.metricstore.rocksdb.RocksDbStore"); + conf.put(DaemonConfig.STORM_ROCKSDB_LOCATION, tempDirForTest.toString()); + conf.put(DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING, true); + conf.put(DaemonConfig.STORM_ROCKSDB_METADATA_STRING_CACHE_CAPACITY, 4000); + conf.put(DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_HOURS, 240); + store = MetricStoreConfig.configure(conf); + } + + @AfterClass + public static void tearDown() throws IOException { + if (store != null) { + store.close(); + } + StringMetadataCache.cleanUp(); + FileUtils.deleteDirectory(tempDirForTest.toFile()); + } + + @Test + public void testAggregation() throws Exception { + double sum0 = 0.0; + double sum1 = 0.0; + double sum10 = 0.0; + double sum60 = 0.0; + Metric toPopulate = null; + for (int i=0; i<20; i++) { + double value = 5 + i; + long timestamp = 1L + i*60*1000; + Metric m = new Metric("cpu", timestamp, "myTopologyId123", value, + "componentId1", "executorId1", "hostname1", "streamid1", 7777, AggLevel.AGG_LEVEL_NONE); + toPopulate = new Metric(m); + store.insert(m); + + if (timestamp < 60*1000) { + sum0 += value; + sum1 += value; + sum10 += value; + sum60 += value; + } else if (timestamp < 600*1000) { + sum10 += value; + sum60 += value; + } else { + sum60 += value; + } + } + + waitForInsertFinish(toPopulate); + + toPopulate.setTimestamp(1L); + toPopulate.setAggLevel(AggLevel.AGG_LEVEL_NONE); + boolean res = store.populateValue(toPopulate); + Assert.assertEquals(true, res); + Assert.assertEquals(sum0, toPopulate.getSum(), 0.001); + Assert.assertEquals(sum0, toPopulate.getValue(), 0.001); + Assert.assertEquals(5.0, toPopulate.getMin(), 0.001); + Assert.assertEquals(5.0, toPopulate.getMax(), 0.001); + Assert.assertEquals(1, toPopulate.getCount()); + + toPopulate.setTimestamp(0L); + toPopulate.setAggLevel(AggLevel.AGG_LEVEL_1_MIN); + res = store.populateValue(toPopulate); + Assert.assertEquals(true, res); + Assert.assertEquals(sum1, toPopulate.getSum(), 0.001); + Assert.assertEquals(sum1, toPopulate.getValue(), 0.001); + Assert.assertEquals(5.0, toPopulate.getMin(), 0.001); + Assert.assertEquals(5.0, toPopulate.getMax(), 0.001); + Assert.assertEquals(1, toPopulate.getCount()); + + toPopulate.setTimestamp(0L); + toPopulate.setAggLevel(AggLevel.AGG_LEVEL_10_MIN); + res = store.populateValue(toPopulate); + Assert.assertEquals(true, res); + Assert.assertEquals(sum10, toPopulate.getSum(), 0.001); + Assert.assertEquals(sum10/10.0, toPopulate.getValue(), 0.001); + Assert.assertEquals(5.0, toPopulate.getMin(), 0.001); + Assert.assertEquals(14.0, toPopulate.getMax(), 0.001); + Assert.assertEquals(10, toPopulate.getCount()); + + toPopulate.setTimestamp(0L); + toPopulate.setAggLevel(AggLevel.AGG_LEVEL_60_MIN); + res = store.populateValue(toPopulate); + Assert.assertEquals(true, res); + Assert.assertEquals(sum60, toPopulate.getSum(), 0.001); + Assert.assertEquals(sum60/20.0, toPopulate.getValue(), 0.001); + Assert.assertEquals(5.0, toPopulate.getMin(), 0.001); + Assert.assertEquals(24.0, toPopulate.getMax(), 0.001); + Assert.assertEquals(20, toPopulate.getCount()); + } + + @Test + public void testPopulateFailure() throws Exception { + Metric m = new Metric("cpu", 3000L, "myTopologyId456", 1.0, + "componentId2", "executorId2", "hostname2", "streamid2", 7778, AggLevel.AGG_LEVEL_NONE); + store.insert(m); + waitForInsertFinish(m); + Metric toFind = new Metric(m); + toFind.setTopologyId("somethingBogus"); + boolean res = store.populateValue(toFind); + Assert.assertEquals(false, res); + } + + private List<Metric> getMetricsFromScan(FilterOptions filter) throws MetricException { + List<Metric> list = new ArrayList<>(); + store.scan(filter, (Metric m) -> { + list.add(m); + }); + return list; + } + + @Test + public void testScan() throws Exception { + FilterOptions filter; + List<Metric> list; + + Metric m1 = new Metric("metricType1", 50000000L, "Topo-m1", 1.0, + "component-1", "executor-2", "hostname-1", "stream-1", 1, AggLevel.AGG_LEVEL_NONE); + Metric m2 = new Metric("metricType2", 50030000L, "Topo-m1", 1.0, + "component-1", "executor-1", "hostname-2", "stream-2", 1, AggLevel.AGG_LEVEL_NONE); + Metric m3 = new Metric("metricType3", 50200000L, "Topo-m1", 1.0, + "component-2", "executor-1", "hostname-1", "stream-3", 1, AggLevel.AGG_LEVEL_NONE); + Metric m4 = new Metric("metricType4", 50200000L, "Topo-m2", 1.0, + "component-2", "executor-1", "hostname-2", "stream-4", 2, AggLevel.AGG_LEVEL_NONE); + store.insert(m1); + store.insert(m2); + store.insert(m3); + store.insert(m4); + waitForInsertFinish(m4); + + // validate search by time + filter = new FilterOptions(); + filter.addAggLevel(AggLevel.AGG_LEVEL_NONE); + filter.setStartTime(50000000L); + filter.setEndTime(50130000L); + list = getMetricsFromScan(filter); + Assert.assertEquals(2, list.size()); + Assert.assertTrue(list.contains(m1)); + Assert.assertTrue(list.contains(m2)); + + // validate search by topology id + filter = new FilterOptions(); + filter.addAggLevel(AggLevel.AGG_LEVEL_NONE); + filter.setTopologyId("Topo-m2"); + list = getMetricsFromScan(filter); + Assert.assertEquals(1, list.size()); + Assert.assertTrue(list.contains(m4)); + + // validate search by metric id + filter = new FilterOptions(); + filter.addAggLevel(AggLevel.AGG_LEVEL_NONE); + filter.setMetricName("metricType2"); + list = getMetricsFromScan(filter); + Assert.assertEquals(1, list.size()); + Assert.assertTrue(list.contains(m2)); + + // validate search by component id + filter = new FilterOptions(); + filter.addAggLevel(AggLevel.AGG_LEVEL_NONE); + filter.setComponentId("component-2"); + list = getMetricsFromScan(filter); + Assert.assertEquals(2, list.size()); + Assert.assertTrue(list.contains(m3)); + Assert.assertTrue(list.contains(m4)); + + // validate search by executor id + filter = new FilterOptions(); + filter.addAggLevel(AggLevel.AGG_LEVEL_NONE); + filter.setExecutorId("executor-1"); + list = getMetricsFromScan(filter); + Assert.assertEquals(3, list.size()); + Assert.assertTrue(list.contains(m2)); + Assert.assertTrue(list.contains(m3)); + Assert.assertTrue(list.contains(m4)); + + // validate search by executor id + filter = new FilterOptions(); + filter.addAggLevel(AggLevel.AGG_LEVEL_NONE); + filter.setExecutorId("executor-1"); + list = getMetricsFromScan(filter); + Assert.assertEquals(3, list.size()); + Assert.assertTrue(list.contains(m2)); + Assert.assertTrue(list.contains(m3)); + Assert.assertTrue(list.contains(m4)); + + // validate search by host id + filter = new FilterOptions(); + filter.addAggLevel(AggLevel.AGG_LEVEL_NONE); + filter.setHostId("hostname-2"); + list = getMetricsFromScan(filter); + Assert.assertEquals(2, list.size()); + Assert.assertTrue(list.contains(m2)); + Assert.assertTrue(list.contains(m4)); + + // validate search by port + filter = new FilterOptions(); + filter.addAggLevel(AggLevel.AGG_LEVEL_NONE); + filter.setPort(1); + list = getMetricsFromScan(filter); + Assert.assertEquals(3, list.size()); + Assert.assertTrue(list.contains(m1)); + Assert.assertTrue(list.contains(m2)); + Assert.assertTrue(list.contains(m3)); + + // validate search by stream id + filter = new FilterOptions(); + filter.addAggLevel(AggLevel.AGG_LEVEL_NONE); + filter.setStreamId("stream-4"); + list = getMetricsFromScan(filter); + Assert.assertEquals(1, list.size()); + Assert.assertTrue(list.contains(m4)); + + // validate 4 metrics (aggregations) found for m4 for all agglevels when searching by port + filter = new FilterOptions(); + filter.setPort(2); + list = getMetricsFromScan(filter); + Assert.assertEquals(4, list.size()); + Assert.assertTrue(list.contains(m4)); + Assert.assertFalse(list.contains(m1)); + Assert.assertFalse(list.contains(m2)); + Assert.assertFalse(list.contains(m3)); + + // validate search by topology id and executor id + filter = new FilterOptions(); + filter.addAggLevel(AggLevel.AGG_LEVEL_NONE); + filter.setTopologyId("Topo-m1"); + filter.setExecutorId("executor-1"); + list = getMetricsFromScan(filter); + Assert.assertEquals(2, list.size()); + Assert.assertTrue(list.contains(m2)); + Assert.assertTrue(list.contains(m3)); + } + + @Test + public void testMetricCleanup() throws Exception { + FilterOptions filter; + List<Metric> list; + + // Share some common metadata strings to validate they do not get deleted + String commonTopologyId = "topology-cleanup-2"; + String commonStreamId = "stream-cleanup-5"; + String defaultS = "default"; + Metric m1 = new Metric(defaultS, 40000000L, commonTopologyId, 1.0, + "component-1", defaultS, "hostname-1", commonStreamId, 1, AggLevel.AGG_LEVEL_NONE); + Metric m2 = new Metric(defaultS, System.currentTimeMillis(), commonTopologyId, 1.0, + "component-1", "executor-1", defaultS, commonStreamId, 1, AggLevel.AGG_LEVEL_NONE); + + store.insert(m1); + store.insert(m2); + waitForInsertFinish(m2); + + // validate at least two agg level none metrics exist + filter = new FilterOptions(); + filter.addAggLevel(AggLevel.AGG_LEVEL_NONE); + list = getMetricsFromScan(filter); + Assert.assertTrue(list.size() >= 2); + + // delete anything older than an hour + MetricsCleaner cleaner = new MetricsCleaner((RocksDbStore)store, 1, 1, null); + cleaner.purgeMetrics(); + list = getMetricsFromScan(filter); + Assert.assertEquals(1, list.size()); + Assert.assertTrue(list.contains(m2)); + } + + private void waitForInsertFinish(Metric m) throws Exception { + Metric last = new Metric(m); + int attempts = 0; + do { + Thread.sleep(1); + attempts++; + if (attempts > 5000) { + throw new Exception("Insertion timing out"); + } + } while (!store.populateValue(last)); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/RocksDbValueTest.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/RocksDbValueTest.java b/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/RocksDbValueTest.java new file mode 100644 index 0000000..fb05796 --- /dev/null +++ b/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/RocksDbValueTest.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.storm.metricstore.rocksdb; + +import org.apache.storm.metricstore.AggLevel; +import org.apache.storm.metricstore.Metric; +import org.apache.storm.metricstore.MetricException; +import org.junit.Assert; +import org.junit.Test; + +public class RocksDbValueTest { + + @Test + public void testMetadataConstructor() { + long timestamp = System.currentTimeMillis(); + String s = "MyTopology123"; + RocksDbValue value = new RocksDbValue(timestamp, s); + Assert.assertEquals(timestamp, value.getLastTimestamp()); + Assert.assertEquals(s, value.getMetdataString()); + + RocksDbValue value2 = new RocksDbValue(value.getRaw()); + Assert.assertEquals(timestamp, value2.getLastTimestamp()); + Assert.assertEquals(s, value2.getMetdataString()); + + int stringId = 0x509; + RocksDbKey key = new RocksDbKey(KeyType.EXEC_ID_STRING, stringId); + StringMetadata metadata = value2.getStringMetadata(key); + Assert.assertEquals(stringId, metadata.getStringId()); + Assert.assertEquals(timestamp, metadata.getLastTimestamp()); + Assert.assertEquals(1, metadata.getMetadataTypes().size()); + Assert.assertEquals(KeyType.EXEC_ID_STRING, metadata.getMetadataTypes().get(0)); + } + + @Test + public void testMetricConstructor() throws MetricException { + Metric m = new Metric("cpu", 1L,"myTopologyId123", 1, + "componentId1", "executorId1", "hostname1", "streamid1", + 7777, AggLevel.AGG_LEVEL_NONE); + Metric m2 = new Metric(m); + Metric m3 = new Metric(m); + + m.addValue(238); + + RocksDbValue value = new RocksDbValue(m); + value.populateMetric(m2); + Assert.assertEquals(m.getValue(), m2.getValue(), 0x001); + Assert.assertEquals(m.getCount(), m2.getCount(), 0x001); + Assert.assertEquals(m.getSum(), m2.getSum(), 0x001); + Assert.assertEquals(m.getMin(), m2.getMin(), 0x001); + Assert.assertEquals(m.getMax(), m2.getMax(), 0x001); + + RocksDbValue value2 = new RocksDbValue(value.getRaw()); + value2.populateMetric(m3); + Assert.assertEquals(m.getValue(), m3.getValue(), 0x001); + Assert.assertEquals(m.getCount(), m3.getCount(), 0x001); + Assert.assertEquals(m.getSum(), m3.getSum(), 0x001); + Assert.assertEquals(m.getMin(), m3.getMin(), 0x001); + Assert.assertEquals(m.getMax(), m3.getMax(), 0x001); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/StringMetadataCacheTest.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/StringMetadataCacheTest.java b/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/StringMetadataCacheTest.java new file mode 100644 index 0000000..6ab5991 --- /dev/null +++ b/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/StringMetadataCacheTest.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.storm.metricstore.rocksdb; + +import org.apache.storm.metricstore.MetricException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.rocksdb.RocksDB; + +public class StringMetadataCacheTest { + + @Before + public void setUp() { + // remove any previously created cache instance + StringMetadataCache.cleanUp(); + RocksDB.loadLibrary(); + } + + private class TestDbWriter extends RocksDbMetricsWriter { + boolean evictCalled = false; + + TestDbWriter() { + super(null, null, null); + } + + @Override + void handleEvictedMetadata(RocksDbKey key, RocksDbValue val) { + evictCalled = true; + } + } + + @After + public void tearDown() { + StringMetadataCache.cleanUp(); + } + + @Test + public void validateEviction() throws MetricException { + TestDbWriter writer = new TestDbWriter(); + StringMetadataCache.init(writer, 2); + WritableStringMetadataCache wCache = StringMetadataCache.getWritableStringMetadataCache(); + ReadOnlyStringMetadataCache rCache = StringMetadataCache.getReadOnlyStringMetadataCache(); + + String s1 = "string1"; + Integer s1Id = 1; + long s1Timestamp = 1L; + StringMetadata metadata1 = new StringMetadata(KeyType.STREAM_ID_STRING, s1Id, s1Timestamp); + wCache.put(s1, metadata1, false); + Assert.assertEquals(metadata1, rCache.get(s1)); + Assert.assertTrue(rCache.contains(s1Id)); + Assert.assertEquals(s1, rCache.getMetadataString(s1Id)); + + String s2 = "string2"; + Integer s2Id = 2; + long s2Timestamp = 2L; + StringMetadata metadata2 = new StringMetadata(KeyType.EXEC_ID_STRING, s2Id, s2Timestamp); + wCache.put(s2, metadata2, false); + Assert.assertEquals(metadata2, rCache.get(s2)); + Assert.assertTrue(rCache.contains(s2Id)); + Assert.assertEquals(s2, rCache.getMetadataString(s2Id)); + + Assert.assertEquals(false, writer.evictCalled); + + // read s1 last.... This should cause s2 to be evicted on next put + rCache.get(s1); + + String s3 = "string3"; + Integer s3Id = 3; + long s3Timestamp = 3L; + StringMetadata metadata3 = new StringMetadata(KeyType.TOPOLOGY_STRING, s3Id, s3Timestamp); + wCache.put(s3, metadata3, false); + + Assert.assertEquals(true, writer.evictCalled); + Assert.assertEquals(metadata3, rCache.get(s3)); + Assert.assertTrue(rCache.contains(s3Id)); + Assert.assertEquals(s3, rCache.getMetadataString(s3Id)); + + // since s2 read last, it should be evicted, s1 and s3 should exist + Assert.assertEquals(null, rCache.get(s2)); + Assert.assertFalse(rCache.contains(s2Id)); + Assert.assertEquals(metadata1, rCache.get(s1)); + Assert.assertTrue(rCache.contains(s1Id)); + Assert.assertEquals(s1, rCache.getMetadataString(s1Id)); + + StringMetadataCache.cleanUp(); + } + + @Test + public void validateMultipleKeyTypes() throws MetricException { + TestDbWriter writer = new TestDbWriter(); + StringMetadataCache.init(writer, 2); + WritableStringMetadataCache wCache = StringMetadataCache.getWritableStringMetadataCache(); + + StringMetadata metadata = new StringMetadata(KeyType.STREAM_ID_STRING, 1, 1L); + wCache.put("default", metadata, false); + + metadata = wCache.get("default"); + metadata.update(3L, KeyType.COMPONENT_STRING); + + metadata = wCache.get("default"); + metadata.update(2L, KeyType.STREAM_ID_STRING); + + metadata = wCache.get("default"); + Assert.assertEquals(2, metadata.getMetadataTypes().size()); + Assert.assertTrue(metadata.getMetadataTypes().contains(KeyType.STREAM_ID_STRING)); + Assert.assertTrue(metadata.getMetadataTypes().contains(KeyType.COMPONENT_STRING)); + Assert.assertEquals(3L, metadata.getLastTimestamp()); + + StringMetadataCache.cleanUp(); + } +}
