Repository: storm
Updated Branches:
  refs/heads/master 879cb5b8f -> e6a423dd8


http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/RocksDbStoreTest.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/RocksDbStoreTest.java
 
b/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/RocksDbStoreTest.java
new file mode 100644
index 0000000..63df80a
--- /dev/null
+++ 
b/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/RocksDbStoreTest.java
@@ -0,0 +1,328 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.metricstore.rocksdb;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.metricstore.AggLevel;
+import org.apache.storm.metricstore.FilterOptions;
+import org.apache.storm.metricstore.Metric;
+import org.apache.storm.metricstore.MetricException;
+import org.apache.storm.metricstore.MetricStore;
+import org.apache.storm.metricstore.MetricStoreConfig;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class RocksDbStoreTest {
+    private final static Logger LOG = 
LoggerFactory.getLogger(RocksDbStoreTest.class);
+    static MetricStore store;
+    static Path tempDirForTest;
+
+    @BeforeClass
+    public static void setUp() throws MetricException, IOException {
+        // remove any previously created cache instance
+        StringMetadataCache.cleanUp();
+        tempDirForTest = Files.createTempDirectory("RocksDbStoreTest");
+        Map<String, Object> conf = new HashMap<>();
+        conf.put(DaemonConfig.STORM_METRIC_STORE_CLASS, 
"org.apache.storm.metricstore.rocksdb.RocksDbStore");
+        conf.put(DaemonConfig.STORM_ROCKSDB_LOCATION, 
tempDirForTest.toString());
+        conf.put(DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING, true);
+        conf.put(DaemonConfig.STORM_ROCKSDB_METADATA_STRING_CACHE_CAPACITY, 
4000);
+        conf.put(DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_HOURS, 240);
+        store = MetricStoreConfig.configure(conf);
+    }
+
+    @AfterClass
+    public static void tearDown() throws IOException {
+        if (store != null) {
+            store.close();
+        }
+        StringMetadataCache.cleanUp();
+        FileUtils.deleteDirectory(tempDirForTest.toFile());
+    }
+
+    @Test
+    public void testAggregation() throws Exception {
+        double sum0 = 0.0;
+        double sum1 = 0.0;
+        double sum10 = 0.0;
+        double sum60 = 0.0;
+        Metric toPopulate = null;
+        for (int i=0; i<20; i++) {
+            double value = 5 + i;
+            long timestamp = 1L + i*60*1000;
+            Metric m = new Metric("cpu", timestamp, "myTopologyId123", value,
+                    "componentId1", "executorId1", "hostname1", "streamid1", 
7777, AggLevel.AGG_LEVEL_NONE);
+            toPopulate = new Metric(m);
+            store.insert(m);
+
+            if (timestamp < 60*1000) {
+                sum0 += value;
+                sum1 += value;
+                sum10 += value;
+                sum60 += value;
+            } else if (timestamp < 600*1000) {
+                sum10 += value;
+                sum60 += value;
+            } else {
+                sum60 += value;
+            }
+        }
+
+        waitForInsertFinish(toPopulate);
+
+        toPopulate.setTimestamp(1L);
+        toPopulate.setAggLevel(AggLevel.AGG_LEVEL_NONE);
+        boolean res = store.populateValue(toPopulate);
+        Assert.assertEquals(true, res);
+        Assert.assertEquals(sum0, toPopulate.getSum(), 0.001);
+        Assert.assertEquals(sum0, toPopulate.getValue(), 0.001);
+        Assert.assertEquals(5.0, toPopulate.getMin(), 0.001);
+        Assert.assertEquals(5.0, toPopulate.getMax(), 0.001);
+        Assert.assertEquals(1, toPopulate.getCount());
+
+        toPopulate.setTimestamp(0L);
+        toPopulate.setAggLevel(AggLevel.AGG_LEVEL_1_MIN);
+        res = store.populateValue(toPopulate);
+        Assert.assertEquals(true, res);
+        Assert.assertEquals(sum1, toPopulate.getSum(), 0.001);
+        Assert.assertEquals(sum1, toPopulate.getValue(), 0.001);
+        Assert.assertEquals(5.0, toPopulate.getMin(), 0.001);
+        Assert.assertEquals(5.0, toPopulate.getMax(), 0.001);
+        Assert.assertEquals(1, toPopulate.getCount());
+
+        toPopulate.setTimestamp(0L);
+        toPopulate.setAggLevel(AggLevel.AGG_LEVEL_10_MIN);
+        res = store.populateValue(toPopulate);
+        Assert.assertEquals(true, res);
+        Assert.assertEquals(sum10, toPopulate.getSum(), 0.001);
+        Assert.assertEquals(sum10/10.0, toPopulate.getValue(), 0.001);
+        Assert.assertEquals(5.0, toPopulate.getMin(), 0.001);
+        Assert.assertEquals(14.0, toPopulate.getMax(), 0.001);
+        Assert.assertEquals(10, toPopulate.getCount());
+
+        toPopulate.setTimestamp(0L);
+        toPopulate.setAggLevel(AggLevel.AGG_LEVEL_60_MIN);
+        res = store.populateValue(toPopulate);
+        Assert.assertEquals(true, res);
+        Assert.assertEquals(sum60, toPopulate.getSum(), 0.001);
+        Assert.assertEquals(sum60/20.0, toPopulate.getValue(), 0.001);
+        Assert.assertEquals(5.0, toPopulate.getMin(), 0.001);
+        Assert.assertEquals(24.0, toPopulate.getMax(), 0.001);
+        Assert.assertEquals(20, toPopulate.getCount());
+    }
+
+    @Test
+    public void testPopulateFailure() throws Exception {
+        Metric m = new Metric("cpu", 3000L, "myTopologyId456", 1.0,
+                "componentId2", "executorId2", "hostname2", "streamid2", 7778, 
AggLevel.AGG_LEVEL_NONE);
+        store.insert(m);
+        waitForInsertFinish(m);
+        Metric toFind = new Metric(m);
+        toFind.setTopologyId("somethingBogus");
+        boolean res = store.populateValue(toFind);
+        Assert.assertEquals(false, res);
+    }
+
+    private List<Metric> getMetricsFromScan(FilterOptions filter) throws 
MetricException {
+        List<Metric> list = new ArrayList<>();
+        store.scan(filter, (Metric m) -> {
+            list.add(m);
+        });
+        return list;
+    }
+
+    @Test
+    public void testScan() throws Exception {
+        FilterOptions filter;
+        List<Metric> list;
+
+        Metric m1 = new Metric("metricType1", 50000000L, "Topo-m1", 1.0,
+                "component-1", "executor-2", "hostname-1", "stream-1", 1, 
AggLevel.AGG_LEVEL_NONE);
+        Metric m2 = new Metric("metricType2", 50030000L, "Topo-m1", 1.0,
+                "component-1", "executor-1", "hostname-2", "stream-2", 1, 
AggLevel.AGG_LEVEL_NONE);
+        Metric m3 = new Metric("metricType3", 50200000L, "Topo-m1", 1.0,
+                "component-2", "executor-1", "hostname-1", "stream-3", 1, 
AggLevel.AGG_LEVEL_NONE);
+        Metric m4 = new Metric("metricType4", 50200000L, "Topo-m2", 1.0,
+                "component-2", "executor-1", "hostname-2", "stream-4", 2, 
AggLevel.AGG_LEVEL_NONE);
+        store.insert(m1);
+        store.insert(m2);
+        store.insert(m3);
+        store.insert(m4);
+        waitForInsertFinish(m4);
+
+        // validate search by time
+        filter = new FilterOptions();
+        filter.addAggLevel(AggLevel.AGG_LEVEL_NONE);
+        filter.setStartTime(50000000L);
+        filter.setEndTime(50130000L);
+        list = getMetricsFromScan(filter);
+        Assert.assertEquals(2, list.size());
+        Assert.assertTrue(list.contains(m1));
+        Assert.assertTrue(list.contains(m2));
+
+        // validate search by topology id
+        filter = new FilterOptions();
+        filter.addAggLevel(AggLevel.AGG_LEVEL_NONE);
+        filter.setTopologyId("Topo-m2");
+        list = getMetricsFromScan(filter);
+        Assert.assertEquals(1, list.size());
+        Assert.assertTrue(list.contains(m4));
+
+        // validate search by metric id
+        filter = new FilterOptions();
+        filter.addAggLevel(AggLevel.AGG_LEVEL_NONE);
+        filter.setMetricName("metricType2");
+        list = getMetricsFromScan(filter);
+        Assert.assertEquals(1, list.size());
+        Assert.assertTrue(list.contains(m2));
+
+        // validate search by component id
+        filter = new FilterOptions();
+        filter.addAggLevel(AggLevel.AGG_LEVEL_NONE);
+        filter.setComponentId("component-2");
+        list = getMetricsFromScan(filter);
+        Assert.assertEquals(2, list.size());
+        Assert.assertTrue(list.contains(m3));
+        Assert.assertTrue(list.contains(m4));
+
+        // validate search by executor id
+        filter = new FilterOptions();
+        filter.addAggLevel(AggLevel.AGG_LEVEL_NONE);
+        filter.setExecutorId("executor-1");
+        list = getMetricsFromScan(filter);
+        Assert.assertEquals(3, list.size());
+        Assert.assertTrue(list.contains(m2));
+        Assert.assertTrue(list.contains(m3));
+        Assert.assertTrue(list.contains(m4));
+
+        // validate search by executor id
+        filter = new FilterOptions();
+        filter.addAggLevel(AggLevel.AGG_LEVEL_NONE);
+        filter.setExecutorId("executor-1");
+        list = getMetricsFromScan(filter);
+        Assert.assertEquals(3, list.size());
+        Assert.assertTrue(list.contains(m2));
+        Assert.assertTrue(list.contains(m3));
+        Assert.assertTrue(list.contains(m4));
+
+        // validate search by host id
+        filter = new FilterOptions();
+        filter.addAggLevel(AggLevel.AGG_LEVEL_NONE);
+        filter.setHostId("hostname-2");
+        list = getMetricsFromScan(filter);
+        Assert.assertEquals(2, list.size());
+        Assert.assertTrue(list.contains(m2));
+        Assert.assertTrue(list.contains(m4));
+
+        // validate search by port
+        filter = new FilterOptions();
+        filter.addAggLevel(AggLevel.AGG_LEVEL_NONE);
+        filter.setPort(1);
+        list = getMetricsFromScan(filter);
+        Assert.assertEquals(3, list.size());
+        Assert.assertTrue(list.contains(m1));
+        Assert.assertTrue(list.contains(m2));
+        Assert.assertTrue(list.contains(m3));
+
+        // validate search by stream id
+        filter = new FilterOptions();
+        filter.addAggLevel(AggLevel.AGG_LEVEL_NONE);
+        filter.setStreamId("stream-4");
+        list = getMetricsFromScan(filter);
+        Assert.assertEquals(1, list.size());
+        Assert.assertTrue(list.contains(m4));
+
+        // validate 4 metrics (aggregations) found for m4 for all agglevels 
when searching by port
+        filter = new FilterOptions();
+        filter.setPort(2);
+        list = getMetricsFromScan(filter);
+        Assert.assertEquals(4, list.size());
+        Assert.assertTrue(list.contains(m4));
+        Assert.assertFalse(list.contains(m1));
+        Assert.assertFalse(list.contains(m2));
+        Assert.assertFalse(list.contains(m3));
+
+        // validate search by topology id and executor id
+        filter = new FilterOptions();
+        filter.addAggLevel(AggLevel.AGG_LEVEL_NONE);
+        filter.setTopologyId("Topo-m1");
+        filter.setExecutorId("executor-1");
+        list = getMetricsFromScan(filter);
+        Assert.assertEquals(2, list.size());
+        Assert.assertTrue(list.contains(m2));
+        Assert.assertTrue(list.contains(m3));
+    }
+
+    @Test
+    public void testMetricCleanup() throws Exception {
+        FilterOptions filter;
+        List<Metric> list;
+
+        // Share some common metadata strings to validate they do not get 
deleted
+        String commonTopologyId = "topology-cleanup-2";
+        String commonStreamId = "stream-cleanup-5";
+        String defaultS = "default";
+        Metric m1 = new Metric(defaultS, 40000000L, commonTopologyId, 1.0,
+                "component-1", defaultS, "hostname-1", commonStreamId, 1, 
AggLevel.AGG_LEVEL_NONE);
+        Metric m2 = new Metric(defaultS, System.currentTimeMillis(), 
commonTopologyId, 1.0,
+                "component-1", "executor-1", defaultS, commonStreamId, 1, 
AggLevel.AGG_LEVEL_NONE);
+
+        store.insert(m1);
+        store.insert(m2);
+        waitForInsertFinish(m2);
+
+        // validate at least two agg level none metrics exist
+        filter = new FilterOptions();
+        filter.addAggLevel(AggLevel.AGG_LEVEL_NONE);
+        list = getMetricsFromScan(filter);
+        Assert.assertTrue(list.size() >= 2);
+
+        // delete anything older than an hour
+        MetricsCleaner cleaner = new MetricsCleaner((RocksDbStore)store, 1, 1, 
null);
+        cleaner.purgeMetrics();
+        list = getMetricsFromScan(filter);
+        Assert.assertEquals(1, list.size());
+        Assert.assertTrue(list.contains(m2));
+    }
+
+    private void waitForInsertFinish(Metric m) throws Exception {
+        Metric last = new Metric(m);
+        int attempts = 0;
+        do {
+            Thread.sleep(1);
+            attempts++;
+            if (attempts > 5000) {
+                throw new Exception("Insertion timing out");
+            }
+        } while (!store.populateValue(last));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/RocksDbValueTest.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/RocksDbValueTest.java
 
b/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/RocksDbValueTest.java
new file mode 100644
index 0000000..fb05796
--- /dev/null
+++ 
b/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/RocksDbValueTest.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.metricstore.rocksdb;
+
+import org.apache.storm.metricstore.AggLevel;
+import org.apache.storm.metricstore.Metric;
+import org.apache.storm.metricstore.MetricException;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RocksDbValueTest {
+
+    @Test
+    public void testMetadataConstructor() {
+        long timestamp = System.currentTimeMillis();
+        String s = "MyTopology123";
+        RocksDbValue value = new RocksDbValue(timestamp, s);
+        Assert.assertEquals(timestamp, value.getLastTimestamp());
+        Assert.assertEquals(s, value.getMetdataString());
+
+        RocksDbValue value2 = new RocksDbValue(value.getRaw());
+        Assert.assertEquals(timestamp, value2.getLastTimestamp());
+        Assert.assertEquals(s, value2.getMetdataString());
+
+        int stringId = 0x509;
+        RocksDbKey key = new RocksDbKey(KeyType.EXEC_ID_STRING, stringId);
+        StringMetadata metadata = value2.getStringMetadata(key);
+        Assert.assertEquals(stringId, metadata.getStringId());
+        Assert.assertEquals(timestamp, metadata.getLastTimestamp());
+        Assert.assertEquals(1, metadata.getMetadataTypes().size());
+        Assert.assertEquals(KeyType.EXEC_ID_STRING, 
metadata.getMetadataTypes().get(0));
+    }
+
+    @Test
+    public void testMetricConstructor() throws MetricException {
+        Metric m = new Metric("cpu", 1L,"myTopologyId123", 1,
+                "componentId1", "executorId1", "hostname1", "streamid1",
+                7777, AggLevel.AGG_LEVEL_NONE);
+        Metric m2 = new Metric(m);
+        Metric m3 = new Metric(m);
+
+        m.addValue(238);
+
+        RocksDbValue value = new RocksDbValue(m);
+        value.populateMetric(m2);
+        Assert.assertEquals(m.getValue(), m2.getValue(), 0x001);
+        Assert.assertEquals(m.getCount(), m2.getCount(), 0x001);
+        Assert.assertEquals(m.getSum(), m2.getSum(), 0x001);
+        Assert.assertEquals(m.getMin(), m2.getMin(), 0x001);
+        Assert.assertEquals(m.getMax(), m2.getMax(), 0x001);
+
+        RocksDbValue value2 = new RocksDbValue(value.getRaw());
+        value2.populateMetric(m3);
+        Assert.assertEquals(m.getValue(), m3.getValue(), 0x001);
+        Assert.assertEquals(m.getCount(), m3.getCount(), 0x001);
+        Assert.assertEquals(m.getSum(), m3.getSum(), 0x001);
+        Assert.assertEquals(m.getMin(), m3.getMin(), 0x001);
+        Assert.assertEquals(m.getMax(), m3.getMax(), 0x001);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/StringMetadataCacheTest.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/StringMetadataCacheTest.java
 
b/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/StringMetadataCacheTest.java
new file mode 100644
index 0000000..6ab5991
--- /dev/null
+++ 
b/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/StringMetadataCacheTest.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.metricstore.rocksdb;
+
+import org.apache.storm.metricstore.MetricException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.rocksdb.RocksDB;
+
+public class StringMetadataCacheTest {
+
+    @Before
+    public void setUp() {
+        // remove any previously created cache instance
+        StringMetadataCache.cleanUp();
+        RocksDB.loadLibrary();
+    }
+
+    private class TestDbWriter extends RocksDbMetricsWriter {
+        boolean evictCalled = false;
+
+        TestDbWriter() {
+            super(null, null, null);
+        }
+
+        @Override
+        void handleEvictedMetadata(RocksDbKey key, RocksDbValue val) {
+            evictCalled = true;
+        }
+    }
+
+    @After
+    public void tearDown() {
+        StringMetadataCache.cleanUp();
+    }
+
+    @Test
+    public void validateEviction() throws MetricException {
+        TestDbWriter writer = new TestDbWriter();
+        StringMetadataCache.init(writer, 2);
+        WritableStringMetadataCache wCache = 
StringMetadataCache.getWritableStringMetadataCache();
+        ReadOnlyStringMetadataCache rCache = 
StringMetadataCache.getReadOnlyStringMetadataCache();
+
+        String s1 = "string1";
+        Integer s1Id = 1;
+        long s1Timestamp = 1L;
+        StringMetadata metadata1 = new 
StringMetadata(KeyType.STREAM_ID_STRING, s1Id, s1Timestamp);
+        wCache.put(s1, metadata1, false);
+        Assert.assertEquals(metadata1, rCache.get(s1));
+        Assert.assertTrue(rCache.contains(s1Id));
+        Assert.assertEquals(s1, rCache.getMetadataString(s1Id));
+
+        String s2 = "string2";
+        Integer s2Id = 2;
+        long s2Timestamp = 2L;
+        StringMetadata metadata2 = new StringMetadata(KeyType.EXEC_ID_STRING, 
s2Id, s2Timestamp);
+        wCache.put(s2, metadata2, false);
+        Assert.assertEquals(metadata2, rCache.get(s2));
+        Assert.assertTrue(rCache.contains(s2Id));
+        Assert.assertEquals(s2, rCache.getMetadataString(s2Id));
+
+        Assert.assertEquals(false, writer.evictCalled);
+
+        // read s1 last....  This should cause s2 to be evicted on next put
+        rCache.get(s1);
+
+        String s3 = "string3";
+        Integer s3Id = 3;
+        long s3Timestamp = 3L;
+        StringMetadata metadata3 = new StringMetadata(KeyType.TOPOLOGY_STRING, 
s3Id, s3Timestamp);
+        wCache.put(s3, metadata3, false);
+
+        Assert.assertEquals(true, writer.evictCalled);
+        Assert.assertEquals(metadata3, rCache.get(s3));
+        Assert.assertTrue(rCache.contains(s3Id));
+        Assert.assertEquals(s3, rCache.getMetadataString(s3Id));
+
+        // since s2 read last, it should be evicted, s1 and s3 should exist
+        Assert.assertEquals(null, rCache.get(s2));
+        Assert.assertFalse(rCache.contains(s2Id));
+        Assert.assertEquals(metadata1, rCache.get(s1));
+        Assert.assertTrue(rCache.contains(s1Id));
+        Assert.assertEquals(s1, rCache.getMetadataString(s1Id));
+
+        StringMetadataCache.cleanUp();
+    }
+
+    @Test
+    public void validateMultipleKeyTypes() throws MetricException {
+        TestDbWriter writer = new TestDbWriter();
+        StringMetadataCache.init(writer, 2);
+        WritableStringMetadataCache wCache = 
StringMetadataCache.getWritableStringMetadataCache();
+
+        StringMetadata metadata = new StringMetadata(KeyType.STREAM_ID_STRING, 
1, 1L);
+        wCache.put("default", metadata, false);
+
+        metadata = wCache.get("default");
+        metadata.update(3L, KeyType.COMPONENT_STRING);
+
+        metadata = wCache.get("default");
+        metadata.update(2L, KeyType.STREAM_ID_STRING);
+
+        metadata = wCache.get("default");
+        Assert.assertEquals(2, metadata.getMetadataTypes().size());
+        
Assert.assertTrue(metadata.getMetadataTypes().contains(KeyType.STREAM_ID_STRING));
+        
Assert.assertTrue(metadata.getMetadataTypes().contains(KeyType.COMPONENT_STRING));
+        Assert.assertEquals(3L, metadata.getLastTimestamp());
+
+        StringMetadataCache.cleanUp();
+    }
+}

Reply via email to