http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/metricstore/MetricStoreConfig.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/MetricStoreConfig.java b/storm-server/src/main/java/org/apache/storm/metricstore/MetricStoreConfig.java new file mode 100644 index 0000000..2f2ad76 --- /dev/null +++ b/storm-server/src/main/java/org/apache/storm/metricstore/MetricStoreConfig.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.storm.metricstore; + +import java.util.Map; +import org.apache.storm.DaemonConfig; + + +public class MetricStoreConfig { + + /** + * Configures metrics store to use the class specified in the conf. + * @param conf Storm config map + * @return MetricStore prepared store + * @throws MetricException on misconfiguration + */ + public static MetricStore configure(Map conf) throws MetricException { + + try { + String storeClass = (String)conf.get(DaemonConfig.STORM_METRIC_STORE_CLASS); + MetricStore store = (MetricStore) (Class.forName(storeClass)).newInstance(); + store.prepare(conf); + return store; + } catch (Throwable t) { + throw new MetricException("Failed to create metric store", t); + } + } +} +
http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/KeyType.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/KeyType.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/KeyType.java new file mode 100644 index 0000000..a351be7 --- /dev/null +++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/KeyType.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.storm.metricstore.rocksdb; + +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.Map; + +/** + * Specifies all the valid types of keys and their values. + */ +public enum KeyType { + TOPOLOGY_BLOB(0), // reserved for saving topology data + METADATA_STRING_START(1), + TOPOLOGY_STRING(1), + METRIC_STRING(2), + COMPONENT_STRING(3), + EXEC_ID_STRING(4), + HOST_STRING(5), + STREAM_ID_STRING(6), + METADATA_STRING_END(7), + METRIC_DATA(0x80); + + private final byte value; + private static Map<Byte, KeyType> MAP; + + static { + MAP = new HashMap<>(); + for (KeyType type : EnumSet.allOf(KeyType.class)) { + MAP.put(type.getValue(), type); + } + MAP = Collections.unmodifiableMap(MAP); + } + + KeyType(int value) { + this.value = (byte)value; + } + + byte getValue() { + return this.value; + } + + static KeyType getKeyType(byte value) { + KeyType type = MAP.get(value); + if (type == null) { + throw new RuntimeException("Invalid key type " + value); + } else { + return type; + } + } +} + + http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/MetricsCleaner.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/MetricsCleaner.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/MetricsCleaner.java new file mode 100644 index 0000000..6618f5d --- /dev/null +++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/MetricsCleaner.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.storm.metricstore.rocksdb; + +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Meter; +import org.apache.storm.metric.StormMetricsRegistry; +import org.apache.storm.metricstore.FilterOptions; +import org.apache.storm.metricstore.MetricException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class for removing expired metrics and unused metadata from the RocksDB store. + */ +public class MetricsCleaner implements Runnable, AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(MetricsCleaner.class); + private static long DEFAULT_SLEEP_MS = 4L * 60L * 60L * 1000L; + private RocksDbStore store; + private long retentionHours; + private volatile boolean shutdown = false; + private long sleepMs = DEFAULT_SLEEP_MS; + private Meter failureMeter; + private long purgeTimestamp = 0L; + + MetricsCleaner(RocksDbStore store, int retentionHours, int hourlyPeriod, Meter failureMeter) { + this.store = store; + this.retentionHours = retentionHours; + if (hourlyPeriod > 0) { + this.sleepMs = hourlyPeriod * 60L * 60L * 1000L; + } + this.failureMeter = failureMeter; + + Gauge<Long> gauge = new Gauge<Long>() { + @Override + public Long getValue() { + return purgeTimestamp; + } + }; + StormMetricsRegistry.registerProvidedGauge("MetricsCleaner:purgeTimestamp", gauge); + } + + @Override + public void close() { + shutdown = true; + } + + @Override + public void run() { + while (!shutdown) { + try { + Thread.sleep(sleepMs); + } catch (InterruptedException e) { + LOG.error("Sleep interrupted", e); + continue; + } + + try { + purgeMetrics(); + } catch (MetricException e) { + LOG.error("Failed to purge metrics", e); + if (this.failureMeter != null) { + this.failureMeter.mark(); + } + } + } + } + + void purgeMetrics() throws MetricException { + purgeTimestamp = System.currentTimeMillis() - this.retentionHours * 60L * 60L * 1000L; + + LOG.info("Purging metrics before {}", purgeTimestamp); + + FilterOptions filter = new FilterOptions(); + long endTime = purgeTimestamp - 1L; + filter.setEndTime(endTime); + store.deleteMetrics(filter); + + LOG.info("Purging metadata before " + purgeTimestamp); + store.deleteMetadataBefore(purgeTimestamp); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/ReadOnlyStringMetadataCache.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/ReadOnlyStringMetadataCache.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/ReadOnlyStringMetadataCache.java new file mode 100644 index 0000000..0effbc4 --- /dev/null +++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/ReadOnlyStringMetadataCache.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.storm.metricstore.rocksdb; + +import org.apache.http.annotation.ThreadSafe; + +/** + * The read-only interface to a StringMetadataCache allowed to be used by any thread. + */ +@ThreadSafe +public interface ReadOnlyStringMetadataCache { + + /** + * Get the string metadata from the cache. + * + * @param s The string to look for + * @return the metadata associated with the string or null if not found + */ + StringMetadata get(String s); + + /** + * Returns the string matching the string Id if in the cache. + * + * @param stringId The string Id to check + * @return the associated string if the Id is in the cache, null otherwise + */ + String getMetadataString(Integer stringId); + + /** + * Determines if a string Id is contained in the cache. + * + * @param stringId The string Id to check + * @return true if the Id is in the cache, false otherwise + */ + boolean contains(Integer stringId); +} http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbKey.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbKey.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbKey.java new file mode 100644 index 0000000..7868282 --- /dev/null +++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbKey.java @@ -0,0 +1,228 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.storm.metricstore.rocksdb; + +import com.google.common.primitives.UnsignedBytes; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.Map; +import javax.xml.bind.DatatypeConverter; +import org.apache.storm.metricstore.AggLevel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Class representing the data used as a Key in RocksDB. Keys can be used either for metadata or metrics. + * + * <P>Keys are 38 bytes in size. The fields for a key are: + * <pre>< + * Field Size Offset + * + * Type 1 0 The type maps to the KeyType enum, specifying a metric or various types of metadata + * Aggregation Level 1 1 The aggregation level for a metric (see AggLevel enum). 0 for metadata. + * TopologyId 4 2 The metadata string Id representing a topologyId for a metric, or the unique + * string Id for a metadata string + * Timestamp 8 6 The timestamp for a metric, unused for metadata + * MetricId 4 14 The metadata string Id for the metric name + * ComponentId 4 18 The metadata string Id for the component Id + * ExecutorId 4 22 The metadata string Id for the executor Id + * HostId 4 26 The metadata string Id for the host Id + * Port 4 30 The port number + * StreamId 4 34 The metadata string Id for the stream Id + * </pre> + */ +public class RocksDbKey implements Comparable<RocksDbKey> { + private static final Logger LOG = LoggerFactory.getLogger(RocksDbKey.class); + static final int KEY_SIZE = 38; + private static Map<Byte, RocksDbKey> PREFIX_MAP = new HashMap<>(); + private byte[] key; + + static { + // pregenerate commonly used keys for scans + for (KeyType type : EnumSet.allOf(KeyType.class)) { + RocksDbKey key = new RocksDbKey(type, 0); + PREFIX_MAP.put(type.getValue(), key); + } + PREFIX_MAP = Collections.unmodifiableMap(PREFIX_MAP); + } + + /** + * Constructor for a RocksDB key for a metadata string. + * + * @param type type of metadata string + * @param metadataStringId the string Id for the string (stored in the topologyId portion of the key) + */ + RocksDbKey(KeyType type, int metadataStringId) { + byte[] key = new byte[KEY_SIZE]; + ByteBuffer bb = ByteBuffer.wrap(key); + bb.put(type.getValue()); + bb.put(AggLevel.AGG_LEVEL_NONE.getValue()); + bb.putInt(metadataStringId); + this.key = key; + } + + /** + * Constructor for a RocksDB key from raw data. + * + * @param raw the key data + */ + RocksDbKey(byte[] raw) { + this.key = raw; + } + + + /** + * Get a zeroed key of the specified type. + * + * @param type the desired type + * @return a key of the desired type + */ + static RocksDbKey getPrefix(KeyType type) { + return PREFIX_MAP.get(type.getValue()); + } + + /** + * get the metadata string Id portion of the key for metadata keys. + * + * @return the metadata string Id + * @throws RuntimeException if the key is not a metadata type + */ + int getMetadataStringId() { + if (this.getType().getValue() < KeyType.METADATA_STRING_END.getValue()) { + return ByteBuffer.wrap(key, 2, 4).getInt(); + } else { + throw new RuntimeException("Cannot fetch metadata string for key of type " + this.getType()); + } + } + + /** + * get the raw key bytes + */ + byte[] getRaw() { + return this.key; + } + + /** + * get the type of key. + * + * @return the type of key + */ + KeyType getType() { + return KeyType.getKeyType(key[0]); + } + + /** + * compares to keys on a byte by byte basis. + * + * @return comparison of key byte values + */ + @Override + public int compareTo(RocksDbKey o) { + return UnsignedBytes.lexicographicalComparator().compare(this.getRaw(), o.getRaw()); + } + + /** + * gets the first possible key value for the desired key type. + * + * @return the initial key + */ + static RocksDbKey getInitialKey(KeyType type) { + return PREFIX_MAP.get(type.getValue()); + } + + /** + * gets the key just larger than the last possible key value for the desired key type. + * + * @return the last key + */ + static RocksDbKey getLastKey(KeyType type) { + byte value = (byte)(type.getValue() + 1); + return PREFIX_MAP.get(value); + } + + /** + * Creates a metric key with the desired properties. + * + * @return the generated key + */ + static RocksDbKey createMetricKey(AggLevel aggLevel, int topologyId, long metricTimestamp, int metricId, + int componentId, int executorId, int hostId, int port, + int streamId) { + byte[] raw = new byte[KEY_SIZE]; + ByteBuffer bb = ByteBuffer.wrap(raw); + bb.put(KeyType.METRIC_DATA.getValue()); + bb.put(aggLevel.getValue()); + bb.putInt(topologyId); // offset 2 + bb.putLong(metricTimestamp); // offset 6 + bb.putInt(metricId); // offset 14 + bb.putInt(componentId); // offset 18 + bb.putInt(executorId); // offset 22 + bb.putInt(hostId); // offset 26 + bb.putInt(port); // offset 30 + bb.putInt(streamId); // offset 34 + + RocksDbKey key = new RocksDbKey(raw); + return key; + } + + /** + * Get the unique string Id for a metric's topologyId. + */ + int getTopologyId() { + int val = ByteBuffer.wrap(key, 2, 4).getInt(); + return val; + } + + long getTimestamp() { + return ByteBuffer.wrap(key, 6, 8).getLong(); + } + + int getMetricId() { + return ByteBuffer.wrap(key, 14, 4).getInt(); + } + + int getComponentId() { + return ByteBuffer.wrap(key, 18, 4).getInt(); + } + + int getExecutorId() { + return ByteBuffer.wrap(key, 22, 4).getInt(); + } + + int getHostnameId() { + return ByteBuffer.wrap(key, 26, 4).getInt(); + } + + int getPort() { + return ByteBuffer.wrap(key, 30, 4).getInt(); + } + + int getStreamId() { + return ByteBuffer.wrap(key, 34, 4).getInt(); + } + + @Override + public String toString() { + return "[0x" + DatatypeConverter.printHexBinary(key) + "]"; + } +} + http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java new file mode 100644 index 0000000..a050a76 --- /dev/null +++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java @@ -0,0 +1,320 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.storm.metricstore.rocksdb; + +import com.codahale.metrics.Meter; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.ListIterator; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.http.annotation.NotThreadSafe; +import org.apache.storm.metricstore.AggLevel; +import org.apache.storm.metricstore.Metric; +import org.apache.storm.metricstore.MetricException; +import org.rocksdb.FlushOptions; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class designed to perform all metrics inserts into RocksDB. Metrics are processed from a blocking queue. Inserts + * to RocksDB are done using a single thread to simplify design (such as looking up existing metric data for aggregation, + * and fetching/evicting metadata from the cache). + * </P> + * A writable LRU StringMetadataCache is used to minimize looking up metadata string Ids. As entries are added to the full cache, older + * entries are evicted from the cache and need to be written to the database. This happens as the handleEvictedMetadata() + * method callback. + * </P> + * The following issues would need to be addressed to implement a multithreaded metrics writer: + * <ul> + * <li>Generation of unique unused IDs for new metadata strings needs to be thread safe.</li> + * <li>Ensuring newly created metadata strings are seen by all threads.</li> + * <li>Maintaining a properly cached state of metadata for multiple writers. The current LRU cache + * evicts data as new metadata is added.</li> + * <li>Processing the aggregation of a metric requires fetching and updating previous aggregates. A multithreaded + * design would need to ensure two metrics were not updating an aggregated metric at the same time.</li> + * <li>Investigate performance of multiple threads inserting into RocksDB versus a single ordered insert.</li> + * </ul> + */ +@NotThreadSafe +public class RocksDbMetricsWriter implements Runnable, AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(RocksDbMetricsWriter.class); + private RocksDbStore store; + private BlockingQueue queue; + private WritableStringMetadataCache stringMetadataCache; + private Set<Integer> unusedIds = new HashSet<>(); + private TreeMap<RocksDbKey, RocksDbValue> insertBatch = new TreeMap<>(); // RocksDB should insert in sorted key order + private WriteOptions writeOpts = new WriteOptions(); + private volatile boolean shutdown = false; + private Meter failureMeter; + private ArrayList<AggLevel> aggBuckets = new ArrayList<>(); + + /** + * Constructor for the RocksDbMetricsWriter. + * + * @param store The RocksDB store + * @param queue The queue to receive metrics for insertion + */ + RocksDbMetricsWriter(RocksDbStore store, BlockingQueue queue, Meter failureMeter) { + this.store = store; + this.queue = queue; + this.failureMeter = failureMeter; + + aggBuckets.add(AggLevel.AGG_LEVEL_1_MIN); + aggBuckets.add(AggLevel.AGG_LEVEL_10_MIN); + aggBuckets.add(AggLevel.AGG_LEVEL_60_MIN); + } + + /** + * Init routine called once the Metadata cache has been created. + * + * @throws MetricException on cache error + */ + void init() throws MetricException { + this.stringMetadataCache = StringMetadataCache.getWritableStringMetadataCache(); + } + + /** + * Run routine to wait for metrics on a queue and insert into RocksDB. + */ + @Override + public void run() { + while (!shutdown) { + try { + Metric m = (Metric) queue.take(); + processInsert(m); + } catch (Exception e) { + LOG.error("Failed to insert metric", e); + if (this.failureMeter != null) { + this.failureMeter.mark(); + } + } + } + } + + /** + * Performs the actual metric insert, and aggregates over all bucket times. + * + * @param metric Metric to store + * @throws MetricException if database write fails + */ + private void processInsert(Metric metric) throws MetricException { + + // convert all strings to numeric Ids for the metric key and add to the metadata cache + long metricTimestamp = metric.getTimestamp(); + Integer topologyId = storeMetadataString(KeyType.TOPOLOGY_STRING, metric.getTopologyId(), metricTimestamp); + Integer metricId = storeMetadataString(KeyType.METRIC_STRING, metric.getMetricName(), metricTimestamp); + Integer componentId = storeMetadataString(KeyType.COMPONENT_STRING, metric.getComponentId(), metricTimestamp); + Integer executorId = storeMetadataString(KeyType.EXEC_ID_STRING, metric.getExecutorId(), metricTimestamp); + Integer hostId = storeMetadataString(KeyType.HOST_STRING, metric.getHostname(), metricTimestamp); + Integer streamId = storeMetadataString(KeyType.STREAM_ID_STRING, metric.getStreamId(), metricTimestamp); + + RocksDbKey key = RocksDbKey.createMetricKey(AggLevel.AGG_LEVEL_NONE, topologyId, metric.getTimestamp(), metricId, + componentId, executorId, hostId, metric.getPort(), streamId); + + // save metric key/value to be batched + RocksDbValue value = new RocksDbValue(metric); + insertBatch.put(key, value); + + // Aggregate matching metrics over bucket timeframes. + // We'll process starting with the longest bucket. If the metric for this does not exist, we don't have to + // search for the remaining bucket metrics. + ListIterator li = aggBuckets.listIterator(aggBuckets.size()); + boolean populate = true; + while (li.hasPrevious()) { + AggLevel bucket = (AggLevel)li.previous(); + Metric aggMetric = new Metric(metric); + aggMetric.setAggLevel(bucket); + + long msToBucket = 1000L * 60L * bucket.getValue(); + long roundedToBucket = msToBucket * (metric.getTimestamp() / msToBucket); + aggMetric.setTimestamp(roundedToBucket); + + RocksDbKey aggKey = RocksDbKey.createMetricKey(bucket, topologyId, aggMetric.getTimestamp(), metricId, + componentId, executorId, hostId, aggMetric.getPort(), streamId); + + if (populate) { + // retrieve any existing aggregation matching this one and update the values + if (store.populateFromKey(aggKey, aggMetric)) { + aggMetric.addValue(metric.getValue()); + } else { + // aggregating metric did not exist, don't look for further ones with smaller timestamps + populate = false; + } + } + + // save metric key/value to be batched + RocksDbValue aggVal = new RocksDbValue(aggMetric); + insertBatch.put(aggKey, aggVal); + } + + processBatchInsert(insertBatch); + + insertBatch.clear(); + } + + // converts a metadata string into a unique integer. Updates the timestamp of the string + // so we can track when it was last used for later deletion on database cleanup. + private int storeMetadataString(KeyType type, String s, long metricTimestamp) throws MetricException { + if (s == null) { + throw new MetricException("No string for metric metadata string type " + type); + } + + // attempt to find it in the string cache + StringMetadata stringMetadata = stringMetadataCache.get(s); + if (stringMetadata != null) { + // make sure the timestamp on the metadata has the latest time + stringMetadata.update(metricTimestamp, type); + return stringMetadata.getStringId(); + } + + // attempt to find the string in the database + stringMetadata = store.rocksDbGetStringMetadata(type, s); + if (stringMetadata != null) { + // update to the latest timestamp and add to the string cache + stringMetadata.update(metricTimestamp, type); + stringMetadataCache.put(s, stringMetadata, false); + return stringMetadata.getStringId(); + } + + // string does not exist, create using an unique string id and add to cache + if (LOG.isDebugEnabled()) { + LOG.debug(type + "." + s + " does not exist in cache or database"); + } + int stringId = getUniqueMetadataStringId(); + stringMetadata = new StringMetadata(type, stringId, metricTimestamp); + stringMetadataCache.put(s, stringMetadata, true); + + return stringMetadata.getStringId(); + } + + // get a currently unused unique string id + private int getUniqueMetadataStringId() throws MetricException { + generateUniqueStringIds(); + int id = unusedIds.iterator().next(); + unusedIds.remove(id); + return id; + } + + // guarantees a list of unused string Ids exists. Once the list is empty, creates a new list + // by generating a list of random numbers and removing the ones that already are in use. + private void generateUniqueStringIds() throws MetricException { + int attempts = 0; + while (unusedIds.isEmpty()) { + attempts++; + if (attempts > 100) { + String message = "Failed to generate unique ids"; + LOG.error(message); + throw new MetricException(message); + } + for (int i = 0; i < 600; i++) { + int n = ThreadLocalRandom.current().nextInt(); + if (n == RocksDbStore.INVALID_METADATA_STRING_ID) { + continue; + } + // remove any entries in the cache + if (stringMetadataCache.contains(n)) { + continue; + } + unusedIds.add(n); + } + // now scan all metadata and remove any matching string Ids from this list + RocksDbKey firstPrefix = RocksDbKey.getPrefix(KeyType.METADATA_STRING_START); + RocksDbKey lastPrefix = RocksDbKey.getPrefix(KeyType.METADATA_STRING_END); + store.scanRange(firstPrefix, lastPrefix, (key, value) -> { + unusedIds.remove(key.getMetadataStringId()); + return true; // process all metadata + }); + } + } + + // writes multiple metric values into the database as a batch operation. The tree map keeps the keys sorted + // for faster insertion to RocksDB. + private void processBatchInsert(TreeMap<RocksDbKey, RocksDbValue> batchMap) throws MetricException { + try (WriteBatch writeBatch = new WriteBatch()) { + // take the batched metric data and write to the database + for (RocksDbKey k : batchMap.keySet()) { + RocksDbValue v = batchMap.get(k); + writeBatch.put(k.getRaw(), v.getRaw()); + } + store.db.write(writeOpts, writeBatch); + } catch (Exception e) { + String message = "Failed to store data to RocksDB"; + LOG.error(message, e); + throw new MetricException(message, e); + } + } + + // evicted metadata needs to be stored immediately. Metadata lookups count on it being in the cache + // or database. + void handleEvictedMetadata(RocksDbKey key, RocksDbValue val) { + try { + store.db.put(key.getRaw(), val.getRaw()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + boolean isShutdown() { + return this.shutdown; + } + + @Override + public void close() { + this.shutdown = true; + + // get all metadata from the cache to put into the database + TreeMap<RocksDbKey, RocksDbValue> batchMap = new TreeMap<>(); // use a new map to prevent threading issues with writer thread + for (Map.Entry entry : stringMetadataCache.entrySet()) { + String metadataString = (String)entry.getKey(); + StringMetadata val = (StringMetadata)entry.getValue(); + RocksDbValue rval = new RocksDbValue(val.getLastTimestamp(), metadataString); + + for (KeyType type : val.getMetadataTypes()) { // save the metadata for all types of strings it matches + RocksDbKey rkey = new RocksDbKey(type, val.getStringId()); + batchMap.put(rkey, rval); + } + } + + try { + processBatchInsert(batchMap); + } catch (MetricException e) { + LOG.error("Failed to insert all metadata", e); + } + + // flush db to disk + try (FlushOptions flushOps = new FlushOptions()) { + flushOps.setWaitForFlush(true); + store.db.flush(flushOps); + } catch (RocksDBException e) { + LOG.error("Failed ot flush RocksDB", e); + if (this.failureMeter != null) { + this.failureMeter.mark(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java new file mode 100644 index 0000000..2f44aff --- /dev/null +++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java @@ -0,0 +1,639 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.storm.metricstore.rocksdb; + +import com.codahale.metrics.Meter; +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.storm.DaemonConfig; +import org.apache.storm.metric.StormMetricsRegistry; +import org.apache.storm.metricstore.AggLevel; +import org.apache.storm.metricstore.FilterOptions; +import org.apache.storm.metricstore.Metric; +import org.apache.storm.metricstore.MetricException; +import org.apache.storm.metricstore.MetricStore; +import org.apache.storm.utils.ObjectReader; +import org.rocksdb.BlockBasedTableConfig; +import org.rocksdb.IndexType; +import org.rocksdb.Options; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class RocksDbStore implements MetricStore, AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(RocksDbStore.class); + private static final int MAX_QUEUE_CAPACITY = 4000; + static final int INVALID_METADATA_STRING_ID = 0; + RocksDB db; + private ReadOnlyStringMetadataCache readOnlyStringMetadataCache = null; + private BlockingQueue queue = new LinkedBlockingQueue(MAX_QUEUE_CAPACITY); + private RocksDbMetricsWriter metricsWriter = null; + private MetricsCleaner metricsCleaner = null; + private Meter failureMeter = null; + + interface RocksDbScanCallback { + boolean cb(RocksDbKey key, RocksDbValue val); // return false to stop scan + } + + /** + * Create metric store instance using the configurations provided via the config map. + * + * @param config Storm config map + * @throws MetricException on preparation error + */ + public void prepare(Map config) throws MetricException { + validateConfig(config); + + this.failureMeter = StormMetricsRegistry.registerMeter("RocksDB:metric-failures"); + + RocksDB.loadLibrary(); + boolean createIfMissing = ObjectReader.getBoolean(config.get(DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING), false); + + try (Options options = new Options().setCreateIfMissing(createIfMissing)) { + // use the hash index for prefix searches + BlockBasedTableConfig tfc = new BlockBasedTableConfig(); + tfc.setIndexType(IndexType.kHashSearch); + options.setTableFormatConfig(tfc); + options.useCappedPrefixExtractor(RocksDbKey.KEY_SIZE); + + String path = getRocksDbAbsoluteDir(config); + LOG.info("Opening RocksDB from {}", path); + db = RocksDB.open(options, path); + } catch (RocksDBException e) { + String message = "Error opening RockDB database"; + LOG.error(message, e); + throw new MetricException(message, e); + } + + // create thread to delete old metrics and metadata + Integer retentionHours = Integer.parseInt(config.get(DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_HOURS).toString()); + Integer deletionPeriod = 0; + if (config.containsKey(DaemonConfig.STORM_ROCKSDB_METRIC_DELETION_PERIOD_HOURS)) { + deletionPeriod = Integer.parseInt(config.get(DaemonConfig.STORM_ROCKSDB_METRIC_DELETION_PERIOD_HOURS).toString()); + } + metricsCleaner = new MetricsCleaner(this, retentionHours, deletionPeriod, failureMeter); + + // create thread to process insertion of all metrics + metricsWriter = new RocksDbMetricsWriter(this, this.queue, this.failureMeter); + + int cacheCapacity = Integer.parseInt(config.get(DaemonConfig.STORM_ROCKSDB_METADATA_STRING_CACHE_CAPACITY).toString()); + StringMetadataCache.init(metricsWriter, cacheCapacity); + readOnlyStringMetadataCache = StringMetadataCache.getReadOnlyStringMetadataCache(); + metricsWriter.init(); // init the writer once the cache is setup + + // start threads after metadata cache created + Thread thread = new Thread(metricsCleaner, "RocksDbMetricsCleaner"); + thread.setDaemon(true); + thread.start(); + + thread = new Thread(metricsWriter, "RocksDbMetricsWriter"); + thread.setDaemon(true); + thread.start(); + } + + /** + * Implements configuration validation of Metrics Store, validates storm configuration for Metrics Store. + * + * @param config Storm config to specify which store type, location of store and creation policy + * @throws MetricException if there is a missing required configuration or if the store does not exist but + * the config specifies not to create the store + */ + private void validateConfig(Map config) throws MetricException { + if (!(config.containsKey(DaemonConfig.STORM_ROCKSDB_LOCATION))) { + throw new MetricException("Not a vaild RocksDB configuration - Missing store location " + DaemonConfig.STORM_ROCKSDB_LOCATION); + } + + if (!(config.containsKey(DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING))) { + throw new MetricException("Not a vaild RocksDB configuration - Does not specify creation policy " + + DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING); + } + + // validate path defined + String storePath = getRocksDbAbsoluteDir(config); + + boolean createIfMissing = ObjectReader.getBoolean(config.get(DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING), false); + if (!createIfMissing) { + if (!(new File(storePath).exists())) { + throw new MetricException("Configuration specifies not to create a store but no store currently exists at " + storePath); + } + } + + if (!(config.containsKey(DaemonConfig.STORM_ROCKSDB_METADATA_STRING_CACHE_CAPACITY))) { + throw new MetricException("Not a valid RocksDB configuration - Missing metadata string cache size " + + DaemonConfig.STORM_ROCKSDB_METADATA_STRING_CACHE_CAPACITY); + } + + if (!config.containsKey(DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_HOURS)) { + throw new MetricException("Not a valid RocksDB configuration - Missing metric retention " + + DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_HOURS); + } + } + + private String getRocksDbAbsoluteDir(Map conf) throws MetricException { + String storePath = (String)conf.get(DaemonConfig.STORM_ROCKSDB_LOCATION); + if (storePath == null) { + throw new MetricException("Not a vaild RocksDB configuration - Missing store location " + DaemonConfig.STORM_ROCKSDB_LOCATION); + } else { + if (new File(storePath).isAbsolute()) { + return storePath; + } else { + String stormHome = System.getProperty("storm.home"); + if (stormHome == null) { + throw new MetricException("storm.home not set"); + } + return (stormHome + File.separator + storePath); + } + } + } + + /** + * Stores metrics in the store. + * + * @param metric Metric to store + * @throws MetricException if database write fails + */ + public void insert(Metric metric) throws MetricException { + try { + // don't bother blocking on a full queue, just drop metrics in case we can't keep up + if (queue.remainingCapacity() <= 0) { + LOG.info("Metrics q full, dropping metric"); + return; + } + queue.put(metric); + } catch (Exception e) { + String message = "Failed to insert metric"; + LOG.error(message, e); + if (this.failureMeter != null) { + this.failureMeter.mark(); + } + throw new MetricException(message, e); + } + } + + /** + * Fill out the numeric values for a metric. + * + * @param metric Metric to populate + * @return true if the metric was populated, false otherwise + * @throws MetricException if read from database fails + */ + @Override + public boolean populateValue(Metric metric) throws MetricException { + Map<String, Integer> localLookupCache = new HashMap<>(6); + + int topologyId = lookupMetadataString(KeyType.TOPOLOGY_STRING, metric.getTopologyId(), localLookupCache); + if (INVALID_METADATA_STRING_ID == topologyId) { + return false; + } + int metricId = lookupMetadataString(KeyType.METRIC_STRING, metric.getMetricName(), localLookupCache); + if (INVALID_METADATA_STRING_ID == metricId) { + return false; + } + int componentId = lookupMetadataString(KeyType.COMPONENT_STRING, metric.getComponentId(), localLookupCache); + if (INVALID_METADATA_STRING_ID == componentId) { + return false; + } + int executorId = lookupMetadataString(KeyType.EXEC_ID_STRING, metric.getExecutorId(), localLookupCache); + if (INVALID_METADATA_STRING_ID == executorId) { + return false; + } + int hostId = lookupMetadataString(KeyType.HOST_STRING, metric.getHostname(), localLookupCache); + if (INVALID_METADATA_STRING_ID == hostId) { + return false; + } + int streamId = lookupMetadataString(KeyType.STREAM_ID_STRING, metric.getStreamId(), localLookupCache); + if (INVALID_METADATA_STRING_ID == streamId) { + return false; + } + + RocksDbKey key = RocksDbKey.createMetricKey(metric.getAggLevel(), topologyId, metric.getTimestamp(), metricId, + componentId, executorId, hostId, metric.getPort(), streamId); + + return populateFromKey(key, metric); + } + + // populate metric values using the provided key + boolean populateFromKey(RocksDbKey key, Metric metric) throws MetricException { + try { + byte[] value = db.get(key.getRaw()); + if (value == null) { + return false; + } + RocksDbValue rdbValue = new RocksDbValue(value); + rdbValue.populateMetric(metric); + } catch (Exception e) { + String message = "Failed to populate metric"; + LOG.error(message, e); + if (this.failureMeter != null) { + this.failureMeter.mark(); + } + throw new MetricException(message, e); + } + return true; + } + + // attempts to lookup the unique Id for a string that may not exist yet. Returns INVALID_METADATA_STRING_ID + // if it does not exist. + private int lookupMetadataString(KeyType type, String s, Map<String, Integer> lookupCache) throws MetricException { + if (s == null) { + if (this.failureMeter != null) { + this.failureMeter.mark(); + } + throw new MetricException("No string for metric metadata string type " + type); + } + + // attempt to find it in the string cache, this will update the LRU + StringMetadata stringMetadata = readOnlyStringMetadataCache.get(s); + if (stringMetadata != null) { + return stringMetadata.getStringId(); + } + + // attempt to find it in callers cache + Integer id = lookupCache.get(s); + if (id != null) { + return id; + } + + // attempt to find the string in the database + stringMetadata = rocksDbGetStringMetadata(type, s); + if (stringMetadata != null) { + id = stringMetadata.getStringId(); + + // add to the callers cache. We can't add it to the stringMetadataCache, since that could cause an eviction + // database write, which we want to only occur from the inserting DB thread. + lookupCache.put(s, id); + + return id; + } + + // string does not exist + return INVALID_METADATA_STRING_ID; + } + + // scans the database to look for a metadata string and returns the metadata info + StringMetadata rocksDbGetStringMetadata(KeyType type, String s) { + RocksDbKey firstKey = RocksDbKey.getInitialKey(type); + RocksDbKey lastKey = RocksDbKey.getLastKey(type); + final AtomicReference<StringMetadata> reference = new AtomicReference<>(); + scanRange(firstKey, lastKey, (key, value) -> { + if (s.equals(value.getMetdataString())) { + reference.set(value.getStringMetadata(key)); + return false; + } else { + return true; // haven't found string, keep searching + } + }); + return reference.get(); + } + + // scans from key start to the key before end, calling back until callback indicates not to process further + void scanRange(RocksDbKey start, RocksDbKey end, RocksDbScanCallback fn) { + try (ReadOptions ro = new ReadOptions()) { + ro.setTotalOrderSeek(true); + RocksIterator iterator = db.newIterator(ro); + for (iterator.seek(start.getRaw()); iterator.isValid(); iterator.next()) { + RocksDbKey key = new RocksDbKey(iterator.key()); + if (key.compareTo(end) >= 0) { // past limit, quit + return; + } + + RocksDbValue val = new RocksDbValue(iterator.value()); + if (!fn.cb(key, val)) { + // if cb returns false, we are done with this section of rows + return; + } + } + } + } + + /** + * Shutdown the store. + */ + @Override + public void close() { + metricsWriter.close(); + metricsCleaner.close(); + } + + /** + * Scans all metrics in the store and returns the ones matching the specified filtering options. + * Callback returns Metric class results. + * + * @param filter options to filter by + * @param scanCallback callback for each Metric found + * @throws MetricException on error + */ + public void scan(FilterOptions filter, ScanCallback scanCallback) throws MetricException { + scanInternal(filter, scanCallback, null); + } + + /** + * Scans all metrics in the store and returns the ones matching the specified filtering options. + * Callback returns raw key/value data. + * + * @param filter options to filter by + * @param rawCallback callback for each Metric found + * @throws MetricException on error + */ + private void scanRaw(FilterOptions filter, RocksDbScanCallback rawCallback) throws MetricException { + scanInternal(filter, null, rawCallback); + } + + // perform a scan given filter options, and return results in either Metric or raw data. + private void scanInternal(FilterOptions filter, ScanCallback scanCallback, RocksDbScanCallback rawCallback) throws MetricException { + + Map<String, Integer> stringToIdCache = new HashMap<>(); + Map<Integer, String> idToStringCache = new HashMap<>(); + + int startTopologyId = 0; + int endTopologyId = 0xFFFFFFFF; + String filterTopologyId = filter.getTopologyId(); + if (filterTopologyId != null) { + int topologyId = lookupMetadataString(KeyType.TOPOLOGY_STRING, filterTopologyId, stringToIdCache); + if (INVALID_METADATA_STRING_ID == topologyId) { + return; // string does not exist in database + } + startTopologyId = topologyId; + endTopologyId = topologyId; + } + + long startTime = filter.getStartTime(); + long endTime = filter.getEndTime(); + + int startMetricId = 0; + int endMetricId = 0xFFFFFFFF; + String filterMetricName = filter.getMetricName(); + if (filterMetricName != null) { + int metricId = lookupMetadataString(KeyType.METRIC_STRING, filterMetricName, stringToIdCache); + if (INVALID_METADATA_STRING_ID == metricId) { + return; // string does not exist in database + } + startMetricId = metricId; + endMetricId = metricId; + } + + int startComponentId = 0; + int endComponentId = 0xFFFFFFFF; + String filterComponentId = filter.getComponentId(); + if (filterComponentId != null) { + int componentId = lookupMetadataString(KeyType.COMPONENT_STRING, filterComponentId, stringToIdCache); + if (INVALID_METADATA_STRING_ID == componentId) { + return; // string does not exist in database + } + startComponentId = componentId; + endComponentId = componentId; + } + + int startExecutorId = 0; + int endExecutorId = 0xFFFFFFFF; + String filterExecutorName = filter.getExecutorId(); + if (filterExecutorName != null) { + int executorId = lookupMetadataString(KeyType.EXEC_ID_STRING, filterExecutorName, stringToIdCache); + if (INVALID_METADATA_STRING_ID == executorId) { + return; // string does not exist in database + } + startExecutorId = executorId; + endExecutorId = executorId; + } + + int startHostId = 0; + int endHostId = 0xFFFFFFFF; + String filterHostId = filter.getHostId(); + if (filterHostId != null) { + int hostId = lookupMetadataString(KeyType.HOST_STRING, filterHostId, stringToIdCache); + if (INVALID_METADATA_STRING_ID == hostId) { + return; // string does not exist in database + } + startHostId = hostId; + endHostId = hostId; + } + + int startPort = 0; + int endPort = 0xFFFFFFFF; + Integer filterPort = filter.getPort(); + if (filterPort != null) { + startPort = filterPort; + endPort = filterPort; + } + + int startStreamId = 0; + int endStreamId = 0xFFFFFFFF; + String filterStreamId = filter.getStreamId(); + if (filterStreamId != null) { + int streamId = lookupMetadataString(KeyType.HOST_STRING, filterStreamId, stringToIdCache); + if (INVALID_METADATA_STRING_ID == streamId) { + return; // string does not exist in database + } + startStreamId = streamId; + endStreamId = streamId; + } + + ReadOptions ro = new ReadOptions(); + ro.setTotalOrderSeek(true); + + for (AggLevel aggLevel : filter.getAggLevels()) { + + RocksDbKey startKey = RocksDbKey.createMetricKey(aggLevel, startTopologyId, startTime, startMetricId, + startComponentId, startExecutorId, startHostId, startPort, startStreamId); + RocksDbKey endKey = RocksDbKey.createMetricKey(aggLevel, endTopologyId, endTime, endMetricId, + endComponentId, endExecutorId, endHostId, endPort, endStreamId); + + RocksIterator iterator = db.newIterator(ro); + for (iterator.seek(startKey.getRaw()); iterator.isValid(); iterator.next()) { + RocksDbKey key = new RocksDbKey(iterator.key()); + + if (key.compareTo(endKey) > 0) { // past limit, quit + break; + } + + if (startTopologyId != 0 && key.getTopologyId() != startTopologyId) { + continue; + } + + long timestamp = key.getTimestamp(); + if (timestamp < startTime || timestamp > endTime) { + continue; + } + + if (startMetricId != 0 && key.getMetricId() != startMetricId) { + continue; + } + + if (startComponentId != 0 && key.getComponentId() != startComponentId) { + continue; + } + + if (startExecutorId != 0 && key.getExecutorId() != startExecutorId) { + continue; + } + + if (startHostId != 0 && key.getHostnameId() != startHostId) { + continue; + } + + if (startPort != 0 && key.getPort() != startPort) { + continue; + } + + if (startStreamId != 0 && key.getStreamId() != startStreamId) { + continue; + } + + RocksDbValue val = new RocksDbValue(iterator.value()); + + if (scanCallback != null) { + try { + // populate a metric + String metricName = metadataIdToString(KeyType.METRIC_STRING, key.getMetricId(), idToStringCache); + String topologyId = metadataIdToString(KeyType.TOPOLOGY_STRING, key.getTopologyId(), idToStringCache); + String componentId = metadataIdToString(KeyType.COMPONENT_STRING, key.getComponentId(), idToStringCache); + String executorId = metadataIdToString(KeyType.EXEC_ID_STRING, key.getExecutorId(), idToStringCache); + String hostname = metadataIdToString(KeyType.HOST_STRING, key.getHostnameId(), idToStringCache); + String streamId = metadataIdToString(KeyType.STREAM_ID_STRING, key.getStreamId(), idToStringCache); + + Metric metric = new Metric(metricName, timestamp, topologyId, 0.0, componentId, executorId, hostname, + streamId, key.getPort(), aggLevel); + + val.populateMetric(metric); + + // callback to caller + scanCallback.cb(metric); + } catch (MetricException e) { + LOG.warn("Failed to report found metric: {}", e.getMessage()); + } + } else { + if (!rawCallback.cb(key, val)) { + return; + } + } + } + iterator.close(); + } + ro.close(); + } + + // Finds the metadata string that matches the string Id and type provided. The string should exist, as it is + // referenced from a metric. + private String metadataIdToString(KeyType type, int id, Map<Integer, String> lookupCache) throws MetricException { + String s = readOnlyStringMetadataCache.getMetadataString(id); + if (s != null) { + return s; + } + s = lookupCache.get(id); + if (s != null) { + return s; + } + // get from DB and add to lookup cache + RocksDbKey key = new RocksDbKey(type, id); + try { + byte[] value = db.get(key.getRaw()); + if (value == null) { + throw new MetricException("Failed to find metadata string for id " + id + " of type " + type); + } + RocksDbValue rdbValue = new RocksDbValue(value); + s = rdbValue.getMetdataString(); + lookupCache.put(id, s); + return s; + } catch (RocksDBException e) { + if (this.failureMeter != null) { + this.failureMeter.mark(); + } + throw new MetricException("Failed to get from RocksDb", e); + } + } + + // deletes metrics matching the filter options + void deleteMetrics(FilterOptions filter) throws MetricException { + try (WriteBatch writeBatch = new WriteBatch(); + WriteOptions writeOps = new WriteOptions()) { + + scanRaw(filter, (RocksDbKey key, RocksDbValue value) -> { + writeBatch.remove(key.getRaw()); + return true; + }); + + if (writeBatch.count() > 0) { + LOG.info("Deleting {} metrics", writeBatch.count()); + try { + db.write(writeOps, writeBatch); + } catch (Exception e) { + String message = "Failed delete metrics"; + LOG.error(message, e); + if (this.failureMeter != null) { + this.failureMeter.mark(); + } + throw new MetricException(message, e); + } + } + } + } + + // deletes metadata strings before the provided timestamp + void deleteMetadataBefore(long firstValidTimestamp) throws MetricException { + if (firstValidTimestamp < 1L) { + if (this.failureMeter != null) { + this.failureMeter.mark(); + } + throw new MetricException("Invalid timestamp for deleting metadata: " + firstValidTimestamp); + } + + try (WriteBatch writeBatch = new WriteBatch(); + WriteOptions writeOps = new WriteOptions()) { + + // search all metadata strings + RocksDbKey topologyMetadataPrefix = RocksDbKey.getPrefix(KeyType.METADATA_STRING_START); + RocksDbKey lastPrefix = RocksDbKey.getPrefix(KeyType.METADATA_STRING_END); + scanRange(topologyMetadataPrefix, lastPrefix, (key, value) -> { + // we'll assume the metadata was recently used if still in the cache. + if (!readOnlyStringMetadataCache.contains(key.getMetadataStringId())) { + if (value.getLastTimestamp() < firstValidTimestamp) { + writeBatch.remove(key.getRaw()); + } + } + return true; + }); + + if (writeBatch.count() > 0) { + LOG.info("Deleting {} metadata strings", writeBatch.count()); + try { + db.write(writeOps, writeBatch); + } catch (Exception e) { + String message = "Failed delete metadata strings"; + LOG.error(message, e); + if (this.failureMeter != null) { + this.failureMeter.mark(); + } + throw new MetricException(message, e); + } + } + } + } +} + http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbValue.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbValue.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbValue.java new file mode 100644 index 0000000..58b2c76 --- /dev/null +++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbValue.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.storm.metricstore.rocksdb; + +import java.nio.ByteBuffer; +import org.apache.storm.metricstore.Metric; + + +/** + * Class representing the data used as a Value in RocksDB. Values can be used either for metadata or metrics. + * + * <p>Formats for Metadata String values are: + * + * <pre> + * Field Size Offset + * + * Version 1 0 The current metadata version - allows migrating if the format changes in the future + * Timestamp 8 1 The time when the metadata was last used by a metric. Allows deleting of old metadata. + * Metadata String any 9 The metadata string + *</pre> + * + * <p>Formats for Metric values are: + * + * <pre> + * Field Size Offset + * + * Version 1 0 The current metric version - allows migrating if the format changes in the future + * Value 8 1 The metric value + * Count 8 9 The metric count + * Min 8 17 The minimum metric value + * Max 8 25 The maximum metric value + * Sum 8 33 The sum of the metric values + * </pre> + */ + +class RocksDbValue { + private static int METRIC_VALUE_SIZE = 41; + private byte[] value; + private static final byte CURRENT_METADATA_VERSION = 0; + private static final byte CURRENT_METRIC_VERSION = 0; + private static int MIN_METADATA_VALUE_SIZE = 9; + + /** + * Constructor from raw data. + * + * @param value the raw bytes representing the key + */ + RocksDbValue(byte[] value) { + this.value = value; + } + + /** + * Constructor for a metadata string. + * + * @param lastTimestamp the last timestamp when the string was used + * @param metadataString the metadata string + */ + RocksDbValue(long lastTimestamp, String metadataString) { + this.value = new byte[MIN_METADATA_VALUE_SIZE + metadataString.length()]; + ByteBuffer bb = ByteBuffer.wrap(value); + bb.put(CURRENT_METADATA_VERSION); + bb.putLong(lastTimestamp); + bb.put(metadataString.getBytes()); + } + + /** + * Constructor for a metric. + * + * @param m the metric to create a value from + */ + RocksDbValue(Metric m) { + this.value = new byte[METRIC_VALUE_SIZE]; + ByteBuffer bb = ByteBuffer.wrap(value); + bb.put(CURRENT_METRIC_VERSION); + bb.putDouble(m.getValue()); + bb.putLong(m.getCount()); + bb.putDouble(m.getMin()); + bb.putDouble(m.getMax()); + bb.putDouble(m.getSum()); + } + + /** + * Get the metadata string portion of the value. Assumes the value is metadata. + * + * @return the metadata string + */ + String getMetdataString() { + if (this.value.length < MIN_METADATA_VALUE_SIZE) { + throw new RuntimeException("RocksDB value is too small to be a metadata string!"); + } + return new String(this.value, 9, this.value.length - 9); + } + + /** + * Gets StringMetadata associated with the key/value pair. + */ + StringMetadata getStringMetadata(RocksDbKey key) { + return new StringMetadata(key.getType(), key.getMetadataStringId(), this.getLastTimestamp()); + } + + /** + * Gets the last time a metadata string was used. + */ + long getLastTimestamp() { + return ByteBuffer.wrap(value, 1, 8).getLong(); + } + + /** + * get the raw value bytes + */ + byte[] getRaw() { + return this.value; + } + + /** + * populate metric values from the raw data. + */ + void populateMetric(Metric metric) { + ByteBuffer bb = ByteBuffer.wrap(this.value, 0, METRIC_VALUE_SIZE); + bb.get(); // version + metric.setValue(bb.getDouble()); + metric.setCount(bb.getLong()); + metric.setMin(bb.getDouble()); + metric.setMax(bb.getDouble()); + metric.setSum(bb.getDouble()); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadata.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadata.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadata.java new file mode 100644 index 0000000..6f54a58 --- /dev/null +++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadata.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.storm.metricstore.rocksdb; + +import java.util.ArrayList; +import java.util.List; + +/** + * Class that contains the information associated with a metadata string that remains cached in memory. + */ +class StringMetadata { + private List<KeyType> types = new ArrayList<>(1); // its possible a string is used by multiple types of metadata strings + private int stringId; + private long lastTimestamp; + + /** + * Constructor for StringMetadata. + * + * @param metadataType the type of metadata string + * @param stringId the unique id for the metadata string + * @param lastTimestamp the timestamp when the metric used the metadata string + */ + StringMetadata(KeyType metadataType, Integer stringId, Long lastTimestamp) { + this.types.add(metadataType); + this.stringId = stringId; + this.lastTimestamp = lastTimestamp; + } + + int getStringId() { + return this.stringId; + } + + long getLastTimestamp() { + return this.lastTimestamp; + } + + List<KeyType> getMetadataTypes() { + return this.types; + } + + private void addKeyType(KeyType type) { + if (!this.types.contains(type)) { + this.types.add(type); + } + } + + /** + * Updates the timestamp of when a metadata string was last used. Adds the type of the string if it is a new + * type. + * + * @param metricTimestamp the timestamp of the metric using the metadata string + * @param type the type of metadata string for the metric + */ + void update(Long metricTimestamp, KeyType type) { + if (metricTimestamp > this.lastTimestamp) { + this.lastTimestamp = metricTimestamp; + } + addKeyType(type); + } + +} + http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadataCache.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadataCache.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadataCache.java new file mode 100644 index 0000000..7ce8435 --- /dev/null +++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadataCache.java @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.storm.metricstore.rocksdb; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.storm.metricstore.MetricException; +import org.apache.storm.utils.LruMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class to create a use a cache that stores Metadata string information in memory. It allows searching for a + * Metadata string's unique id, or looking up the string by the unique id. The StringMetadata is stored in an + * LRU map. When an entry is added to the cache, an older entry may be evicted, which then needs to be + * immediately stored to the database to provide a consistent view of all the metadata strings. + * + * <p>All write operations adding metadata to RocksDB are done by a single thread (a RocksDbMetricsWriter), + * but multiple threads can read values from the cache. To clarify which permissions are accessible by various + * threads, the ReadOnlyStringMetadataCache and WritableStringMetadataCache are provided to be used. + */ + +public class StringMetadataCache implements LruMap.CacheEvictionCallback<String, StringMetadata>, + WritableStringMetadataCache, ReadOnlyStringMetadataCache { + private static final Logger LOG = LoggerFactory.getLogger(StringMetadataCache.class); + private Map<String, StringMetadata> lruStringCache; + private Map<Integer, String> hashToString = new ConcurrentHashMap<>(); + private RocksDbMetricsWriter dbWriter; + private static StringMetadataCache instance = null; + + /** + * Initializes the cache instance. + * + * @param dbWriter the RocksDB writer instance to handle writing evicted cache data + * @param capacity the number of StringMetadata instances to hold in memory + * @throws MetricException if creating multiple cache instances + */ + static void init(RocksDbMetricsWriter dbWriter, int capacity) throws MetricException { + if (instance == null) { + instance = new StringMetadataCache(dbWriter, capacity); + } else { + throw new MetricException("StringMetadataCache already created"); + } + } + + /** + * Provides the WritableStringMetadataCache interface to the cache instance. + * + * @throws MetricException if the cache instance was not created + */ + static WritableStringMetadataCache getWritableStringMetadataCache() throws MetricException { + if (instance != null) { + return instance; + } else { + throw new MetricException("StringMetadataCache was not initialized"); + } + } + + /** + * Provides the ReadOnlyStringMetadataCache interface to the cache instance. + * + * @throws MetricException if the cache instance was not created + */ + static ReadOnlyStringMetadataCache getReadOnlyStringMetadataCache() throws MetricException { + if (instance != null) { + return instance; + } else { + throw new MetricException("StringMetadataCache was not initialized"); + } + } + + /** + * Constructor to create a cache. + * + * @param dbWriter The rocks db writer instance the cache should use when evicting data + * @param capacity The cache size + */ + private StringMetadataCache(RocksDbMetricsWriter dbWriter, int capacity) { + lruStringCache = Collections.synchronizedMap(new LruMap<>(capacity, this)); + this.dbWriter = dbWriter; + } + + /** + * Get the string metadata from the cache. + * + * @param s The string to look for + * @return the metadata associated with the string or null if not found + */ + public StringMetadata get(String s) { + return lruStringCache.get(s); + } + + /** + * Add the string metadata to the cache. + * + * NOTE: this can cause data to be evicted from the cache when full. When this occurs, the evictionCallback() method + * is called to store the metadata back into the RocksDB database. + * + * This method is only exposed to the WritableStringMetadataCache interface. + * + * @param s The string to add + * @param stringMetadata The string's metadata + * @param newEntry Indicates the metadata is being used for the first time and should be written to RocksDB immediately + * @throws MetricException when evicted data fails to save to the database or when the database is shutdown + */ + public void put(String s, StringMetadata stringMetadata, boolean newEntry) throws MetricException { + if (dbWriter.isShutdown()) { + // another thread could be writing out the metadata cache to the database. + throw new MetricException("Shutting down"); + } + try { + if (newEntry) { + writeMetadataToDisk(s, stringMetadata); + } + lruStringCache.put(s, stringMetadata); + hashToString.put(stringMetadata.getStringId(), s); + } catch (Exception e) { // catch any runtime exceptions caused by eviction + throw new MetricException("Failed to save string in metadata cache", e); + } + } + + /** + * Callback when data is about to be removed from the cache. This method then + * immediately writes the metadata to RocksDB. + * + * @param key The evicted string + * @param val The evicted string's metadata + * @throws RuntimeException when evicted data fails to save to the database + */ + public void evictionCallback(String key, StringMetadata val) { + writeMetadataToDisk(key, val); + } + + private void writeMetadataToDisk(String key, StringMetadata val) { + if (LOG.isDebugEnabled()) { + LOG.debug("Writing {} to RocksDB", key); + } + // remove reverse lookup from map + hashToString.remove(val.getStringId()); + + // save the evicted key/value to the database immediately + RocksDbValue rval = new RocksDbValue(val.getLastTimestamp(), key); + + for (KeyType type : val.getMetadataTypes()) { // save the metadata for all types of strings it matches + RocksDbKey rkey = new RocksDbKey(type, val.getStringId()); + dbWriter.handleEvictedMetadata(rkey, rval); + } + } + + /** + * Determines if a string Id is contained in the cache. + * + * @param stringId The string Id to check + * @return true if the Id is in the cache, false otherwise + */ + public boolean contains(Integer stringId) { + return hashToString.containsKey(stringId); + } + + /** + * Returns the string matching the string Id if in the cache. + * + * @param stringId The string Id to check + * @return the associated string if the Id is in the cache, null otherwise + */ + public String getMetadataString(Integer stringId) { + return hashToString.get(stringId); + } + + /** + * Get the map of the cache contents. Provided to allow writing the data to RocksDB on shutdown. + * + * @return the string metadata map entrySet + */ + public Set<Map.Entry<String, StringMetadata>> entrySet() { + return lruStringCache.entrySet(); + } + + static void cleanUp() { + instance = null; + } + +} + http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/WritableStringMetadataCache.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/WritableStringMetadataCache.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/WritableStringMetadataCache.java new file mode 100644 index 0000000..2d4165f --- /dev/null +++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/WritableStringMetadataCache.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.storm.metricstore.rocksdb; + +import java.util.Map; +import java.util.Set; + +import org.apache.http.annotation.NotThreadSafe; +import org.apache.storm.metricstore.MetricException; + +/** + * The writable interface to a StringMetadataCache intended to be used by a single RocksDBMetricwWriter instance. + */ +@NotThreadSafe +public interface WritableStringMetadataCache extends ReadOnlyStringMetadataCache { + + /** + * Add the string metadata to the cache. + * + * * NOTE: this can cause data to be evicted from the cache when full. When this occurs, the evictionCallback() method + * is called to store the metadata back into the RocksDB database. + * + * This method is only exposed to the WritableStringMetadataCache interface. + * + * @param s The string to add + * @param stringMetadata The string's metadata + * @param newEntry Indicates the metadata is being used for the first time and should be written to RocksDB immediately + * @throws MetricException when evicted data fails to save to the database or when the database is shutdown + */ + void put(String s, StringMetadata stringMetadata, boolean newEntry) throws MetricException; + + /** + * Get the map of the cache contents. Provided to allow writing the data to RocksDB on shutdown. + * + * @return the string metadata map entrySet + */ + Set<Map.Entry<String, StringMetadata>> entrySet(); +} http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/utils/LruMap.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/utils/LruMap.java b/storm-server/src/main/java/org/apache/storm/utils/LruMap.java new file mode 100644 index 0000000..3ed5d06 --- /dev/null +++ b/storm-server/src/main/java/org/apache/storm/utils/LruMap.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.utils; + +import java.util.LinkedHashMap; +import java.util.Map; + +public class LruMap<A, B> extends LinkedHashMap<A, B> { + private int maxSize; + private CacheEvictionCallback evCb = null; + + public LruMap(int maxSize) { + super(maxSize + 1, 1.0f, true); + this.maxSize = maxSize; + } + + /** + * Creates an LRU map that will call back before data is removed from the map. + * + * @param maxSize max capacity for the map + * @param evictionCallback callback to be called before removing data + */ + public LruMap(int maxSize, CacheEvictionCallback evictionCallback) { + this(maxSize); + this.evCb = evictionCallback; + } + + @Override + protected boolean removeEldestEntry(final Map.Entry<A, B> eldest) { + boolean evict = size() > this.maxSize; + if (evict && this.evCb != null) { + this.evCb.evictionCallback(eldest.getKey(), eldest.getValue()); + } + return evict; + } + + public interface CacheEvictionCallback<K, V> { + void evictionCallback(K key, V val); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/RocksDbKeyTest.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/RocksDbKeyTest.java b/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/RocksDbKeyTest.java new file mode 100644 index 0000000..21d2377 --- /dev/null +++ b/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/RocksDbKeyTest.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.storm.metricstore.rocksdb; + +import org.apache.storm.metricstore.AggLevel; +import org.junit.Assert; +import org.junit.Test; + +public class RocksDbKeyTest { + + @Test + public void testConstructors() { + byte[] raw = new byte[RocksDbKey.KEY_SIZE]; + raw[0] = KeyType.COMPONENT_STRING.getValue(); + raw[2] = 0x01; + raw[3] = 0x02; + raw[4] = 0x03; + raw[5] = 0x04; + RocksDbKey rawKey = new RocksDbKey(raw); + + RocksDbKey metadataKey = new RocksDbKey(KeyType.COMPONENT_STRING, 0x01020304); + Assert.assertEquals(0, metadataKey.compareTo(rawKey)); + Assert.assertEquals(KeyType.COMPONENT_STRING, metadataKey.getType()); + + metadataKey = new RocksDbKey(KeyType.TOPOLOGY_STRING, 0x01020304); + Assert.assertTrue(metadataKey.compareTo(rawKey) < 0); + Assert.assertEquals(KeyType.TOPOLOGY_STRING, metadataKey.getType()); + + metadataKey = new RocksDbKey(KeyType.COMPONENT_STRING, 0x01020305); + Assert.assertTrue(metadataKey.compareTo(rawKey) > 0); + + Assert.assertEquals(0x01020304, rawKey.getTopologyId()); + Assert.assertEquals(KeyType.COMPONENT_STRING, rawKey.getType()); + } + + @Test + public void testMetricKey() { + AggLevel aggLevel = AggLevel.AGG_LEVEL_10_MIN; + int topologyId = 0x45665; + long timestamp = System.currentTimeMillis(); + int metricId = 0xF3916034; + int componentId = 0x82915031; + int executorId = 0x434738; + int hostId = 0x4348394; + int port = 3456; + int streamId = 0x84221956; + RocksDbKey key = RocksDbKey.createMetricKey(aggLevel, topologyId, timestamp, metricId, + componentId, executorId, hostId, port, streamId); + Assert.assertEquals(topologyId, key.getTopologyId()); + Assert.assertEquals(timestamp, key.getTimestamp()); + Assert.assertEquals(metricId, key.getMetricId()); + Assert.assertEquals(componentId, key.getComponentId()); + Assert.assertEquals(executorId, key.getExecutorId()); + Assert.assertEquals(hostId, key.getHostnameId()); + Assert.assertEquals(port, key.getPort()); + Assert.assertEquals(streamId, key.getStreamId()); + } +}
