Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/storm/pull/2504#discussion_r162544306
--- Diff:
storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java
---
@@ -0,0 +1,323 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.metricstore.rocksdb;
+
+import com.codahale.metrics.Meter;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.http.annotation.NotThreadSafe;
+import org.apache.storm.metricstore.AggLevel;
+import org.apache.storm.metricstore.Metric;
+import org.apache.storm.metricstore.MetricException;
+import org.rocksdb.FlushOptions;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class designed to perform all metrics inserts into RocksDB. Metrics
are processed from a blocking queue. Inserts
+ * to RocksDB are done using a single thread to simplify design (such as
looking up existing metric data for aggregation,
+ * and fetching/evicting metadata from the cache).
+ * </P>
+ * A writable LRU StringMetadataCache is used to minimize looking up
metadata string Ids. As entries are added to the full cache, older
+ * entries are evicted from the cache and need to be written to the
database. This happens as the handleEvictedMetadata()
+ * method callback.
+ * </P>
+ * The following issues would need to be addressed to implement a
multithreaded metrics writer:
+ * <ul>
+ * <li>Generation of unique unused IDs for new metadata strings needs
to be thread safe.</li>
+ * <li>Ensuring newly created metadata strings are seen by all
threads.</li>
+ * <li>Maintaining a properly cached state of metadata for multiple
writers. The current LRU cache
+ * evicts data as new metadata is added.</li>
+ * <li>Processing the aggregation of a metric requires fetching and
updating previous aggregates. A multithreaded
+ * design would need to ensure two metrics were not updating an
aggregated metric at the same time.</li>
+ * <li>Investigate performance of multiple threads inserting into
RocksDB versus a single ordered insert.</li>
+ * </ul>
+ */
+@NotThreadSafe
+public class RocksDbMetricsWriter implements Runnable, AutoCloseable {
+ private static final Logger LOG =
LoggerFactory.getLogger(RocksDbMetricsWriter.class);
+ private RocksDbStore store;
+ private BlockingQueue queue;
+ private WritableStringMetadataCache stringMetadataCache;
+ private Set<Integer> unusedIds = new HashSet<>();
+ private TreeMap<RocksDbKey, RocksDbValue> insertBatch = new
TreeMap<>(); // RocksDB should insert in sorted key order
+ private WriteOptions writeOpts = new WriteOptions();
+ private volatile boolean shutdown = false;
+ private Meter failureMeter;
+ private ArrayList<AggLevel> aggBuckets = new ArrayList<>();
+
+ /**
+ * Constructor for the RocksDbMetricsWriter.
+ *
+ * @param store The RocksDB store
+ * @param queue The queue to receive metrics for insertion
+ */
+ RocksDbMetricsWriter(RocksDbStore store, BlockingQueue queue, Meter
failureMeter) {
+ this.store = store;
+ this.queue = queue;
+ this.failureMeter = failureMeter;
+
+ aggBuckets.add(AggLevel.AGG_LEVEL_1_MIN);
+ aggBuckets.add(AggLevel.AGG_LEVEL_10_MIN);
+ aggBuckets.add(AggLevel.AGG_LEVEL_60_MIN);
+ }
+
+ /**
+ * Init routine called once the Metadata cache has been created.
+ *
+ * @throws MetricException on cache error
+ */
+ void init() throws MetricException {
+ this.stringMetadataCache =
StringMetadataCache.getWritableStringMetadataCache();
+ }
+
+ /**
+ * Run routine to wait for metrics on a queue and insert into RocksDB.
+ */
+ @Override
+ public void run() {
+ while (true) {
+ if (shutdown) {
--- End diff --
Same here.
---