Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2504#discussion_r160245178
--- Diff:
storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java
---
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.metricstore.rocksdb;
+
+import com.codahale.metrics.Meter;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.storm.metricstore.AggLevel;
+import org.apache.storm.metricstore.Metric;
+import org.apache.storm.metricstore.MetricException;
+import org.rocksdb.FlushOptions;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class designed to perform all metrics inserts into RocksDB. Metrics
are processed from the a blocking queue.
+ * </P>
+ * A writable LRU StringMetadataCache is used to minimize looking up
metadata string Ids. As entries are added to the full cache, older
+ * entries are evicted from the cache and need to be written to the
database. This happens as the handleEvictedMetadata()
+ * method callback.
+ */
+public class RocksDbMetricsWriter implements Runnable, AutoCloseable {
+ private static final Logger LOG =
LoggerFactory.getLogger(RocksDbMetricsWriter.class);
+ private RocksDbStore store;
+ private BlockingQueue queue;
+ private WritableStringMetadataCache stringMetadataCache;
+ private Set<Integer> unusedIds = new HashSet<>();
+ private TreeMap<RocksDbKey, RocksDbValue> insertBatch = new
TreeMap<>(); // RocksDB should insert in sorted key order
+ private WriteOptions writeOpts = new WriteOptions();
+ private volatile boolean shutdown = false;
+ private Meter failureMeter;
+ private ArrayList<AggLevel> aggBuckets = new ArrayList<>();
+
+ /**
+ * Constructor for the RocksDbMetricsWriter.
+ *
+ * @param store The RocksDB store
+ * @param queue The queue to receive metrics for insertion
+ */
+ RocksDbMetricsWriter(RocksDbStore store, BlockingQueue queue, Meter
failureMeter) {
+ this.store = store;
+ this.queue = queue;
+ this.failureMeter = failureMeter;
+
+ aggBuckets.add(AggLevel.AGG_LEVEL_1_MIN);
+ aggBuckets.add(AggLevel.AGG_LEVEL_10_MIN);
+ aggBuckets.add(AggLevel.AGG_LEVEL_60_MIN);
+ }
+
+ /**
+ * Init routine called once the Metadata cache has been created.
+ *
+ * @throws MetricException on cache error
+ */
+ void init() throws MetricException {
+ this.stringMetadataCache =
StringMetadataCache.getWritableStringMetadataCache();
+ }
+
+ /**
+ * Run routine to wait for metrics on a queue and insert into RocksDB.
+ */
+ @Override
+ public void run() {
+ while (true) {
+ if (shutdown) {
+ return;
+ }
+ try {
+ Metric m = (Metric) queue.take();
+ processInsert(m);
+ } catch (Exception e) {
+ LOG.error("Failed to insert metric", e);
+ if (this.failureMeter != null) {
+ this.failureMeter.mark();
+ }
+ }
+ }
+ }
+
+ /**
+ * Performs the actual metric insert, and aggregates over all bucket
times.
+ *
+ * @param metric Metric to store
+ * @throws MetricException if database write fails
+ */
+ private void processInsert(Metric metric) throws MetricException {
+
+ // convert all strings to numeric Ids for the metric key and add
to the metadata cache
+ long metricTimestamp = metric.getTimestamp();
+ Integer topologyId = storeMetadataString(KeyType.TOPOLOGY_STRING,
metric.getTopologyId(), metricTimestamp);
+ Integer metricId = storeMetadataString(KeyType.METRIC_STRING,
metric.getMetricName(), metricTimestamp);
+ Integer componentId =
storeMetadataString(KeyType.COMPONENT_STRING, metric.getComponentId(),
metricTimestamp);
+ Integer executorId = storeMetadataString(KeyType.EXEC_ID_STRING,
metric.getExecutorId(), metricTimestamp);
+ Integer hostId = storeMetadataString(KeyType.HOST_STRING,
metric.getHostname(), metricTimestamp);
+ Integer streamId = storeMetadataString(KeyType.STREAM_ID_STRING,
metric.getStreamId(), metricTimestamp);
+
+ RocksDbKey key =
RocksDbKey.createMetricKey(AggLevel.AGG_LEVEL_NONE, topologyId,
metric.getTimestamp(), metricId,
+ componentId, executorId, hostId, metric.getPort(),
streamId);
+
+ // save metric key/value to be batched
+ RocksDbValue value = new RocksDbValue(metric);
+ insertBatch.put(key, value);
+
+ // Aggregate matching metrics over bucket timeframes.
+ // We'll process starting with the longest bucket. If the metric
for this does not exist, we don't have to
+ // search for the remaining bucket metrics.
+ ListIterator li = aggBuckets.listIterator(aggBuckets.size());
+ boolean populate = true;
+ while (li.hasPrevious()) {
+ AggLevel bucket = (AggLevel)li.previous();
+ Metric aggMetric = new Metric(metric);
+ aggMetric.setAggLevel(bucket);
+
+ long msToBucket = 1000L * 60L * bucket.getValue();
+ long roundedToBucket = msToBucket * (metric.getTimestamp() /
msToBucket);
+ aggMetric.setTimestamp(roundedToBucket);
+
+ RocksDbKey aggKey = RocksDbKey.createMetricKey(bucket,
topologyId, aggMetric.getTimestamp(), metricId,
+ componentId, executorId, hostId, aggMetric.getPort(),
streamId);
+
+ if (populate) {
+ // retrieve any existing aggregation matching this one and
update the values
+ if (store.populateFromKey(aggKey, aggMetric)) {
+ aggMetric.addValue(metric.getValue());
+ } else {
+ // aggregating metric did not exist, don't look for
further ones with smaller timestamps
+ populate = false;
+ }
+ }
+
+ // save metric key/value to be batched
+ RocksDbValue aggVal = new RocksDbValue(aggMetric);
+ insertBatch.put(aggKey, aggVal);
+ }
+
+ processBatchInsert(insertBatch);
+
+ insertBatch.clear();
+ }
+
+ // converts a metadata string into a unique integer. Updates the
timestamp of the string
+ // so we can track when it was last used for later deletion on
database cleanup.
+ private int storeMetadataString(KeyType type, String s, long
metricTimestamp) throws MetricException {
+ if (s == null) {
+ throw new MetricException("No string for metric metadata
string type " + type);
+ }
+
+ // attempt to find it in the string cache
+ StringMetadata stringMetadata = stringMetadataCache.get(s);
+ if (stringMetadata != null) {
+ // make sure the timestamp on the metadata has the latest time
+ stringMetadata.update(metricTimestamp, type);
+ return stringMetadata.getStringId();
+ }
+
+ // attempt to find the string in the database
+ stringMetadata = store.rocksDbGetStringMetadata(type, s);
+ if (stringMetadata != null) {
+ // update to the latest timestamp and add to the string cache
+ stringMetadata.update(metricTimestamp, type);
+ stringMetadataCache.put(s, stringMetadata, false);
+ return stringMetadata.getStringId();
+ }
+
+ // string does not exist, create using an unique string id and add
to cache
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(type + "." + s + " does not exist in cache or
database");
+ }
+ int stringId = getUniqueMetadataStringId();
+ stringMetadata = new StringMetadata(type, stringId,
metricTimestamp);
+ stringMetadataCache.put(s, stringMetadata, true);
+
+ return stringMetadata.getStringId();
+ }
+
+ // get a currently unused unique string id
+ private int getUniqueMetadataStringId() throws MetricException {
+ generateUniqueStringIds();
+ int id = unusedIds.iterator().next();
+ unusedIds.remove(id);
+ return id;
+ }
+
+ // guarantees a list of unused string Ids exists. Once the list is
empty, creates a new list
+ // by generating a list of random numbers and removing the ones that
already are in use.
+ private void generateUniqueStringIds() throws MetricException {
+ int attempts = 0;
+ while (unusedIds.isEmpty()) {
+ attempts++;
+ if (attempts > 100) {
+ String message = "Failed to generate unique ids";
+ LOG.error(message);
+ throw new MetricException(message);
+ }
+ for (int i = 0; i < 600; i++) {
+ int n = ThreadLocalRandom.current().nextInt();
+ if (n == RocksDbStore.INVALID_METADATA_STRING_ID) {
+ continue;
+ }
+ // remove any entries in the cache
+ if (stringMetadataCache.contains(n)) {
+ continue;
+ }
+ unusedIds.add(n);
+ }
+ // now scan all metadata and remove any matching string Ids
from this list
+ RocksDbKey firstPrefix =
RocksDbKey.getPrefix(KeyType.METADATA_STRING_START);
+ RocksDbKey lastPrefix =
RocksDbKey.getPrefix(KeyType.METADATA_STRING_END);
+ store.scanRange(firstPrefix, lastPrefix, (key, value) -> {
+ unusedIds.remove(key.getMetadataStringId());
+ return true; // process all metadata
+ });
+ }
+ }
+
+ // writes multiple metric values into the database as a batch operation
+ private void processBatchInsert(Map<RocksDbKey, RocksDbValue>
batchMap) throws MetricException {
+ try (WriteBatch writeBatch = new WriteBatch()) {
--- End diff --
Because of the thread safety problems with the map, it might be simpler to
use the WriteBatch object directly instead of trying to use the map first.
---