Github user agresch commented on a diff in the pull request:
https://github.com/apache/storm/pull/2504#discussion_r162668430
--- Diff:
storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java
---
@@ -0,0 +1,639 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.metricstore.rocksdb;
+
+import com.codahale.metrics.Meter;
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.metricstore.AggLevel;
+import org.apache.storm.metricstore.FilterOptions;
+import org.apache.storm.metricstore.Metric;
+import org.apache.storm.metricstore.MetricException;
+import org.apache.storm.metricstore.MetricStore;
+import org.apache.storm.utils.ObjectReader;
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.IndexType;
+import org.rocksdb.Options;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class RocksDbStore implements MetricStore, AutoCloseable {
+ private static final Logger LOG =
LoggerFactory.getLogger(RocksDbStore.class);
+ private static final int MAX_QUEUE_CAPACITY = 4000;
+ static final int INVALID_METADATA_STRING_ID = 0;
+ RocksDB db;
+ private ReadOnlyStringMetadataCache readOnlyStringMetadataCache = null;
+ private BlockingQueue queue = new
LinkedBlockingQueue(MAX_QUEUE_CAPACITY);
+ private RocksDbMetricsWriter metricsWriter = null;
+ private MetricsCleaner metricsCleaner = null;
+ private Meter failureMeter = null;
+
+ interface RocksDbScanCallback {
+ boolean cb(RocksDbKey key, RocksDbValue val); // return false to
stop scan
+ }
+
+ /**
+ * Create metric store instance using the configurations provided via
the config map.
+ *
+ * @param config Storm config map
+ * @throws MetricException on preparation error
+ */
+ public void prepare(Map config) throws MetricException {
+ validateConfig(config);
+
+ this.failureMeter =
StormMetricsRegistry.registerMeter("RocksDB:metric-failures");
+
+ RocksDB.loadLibrary();
+ boolean createIfMissing =
ObjectReader.getBoolean(config.get(DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING),
false);
+
+ try (Options options = new
Options().setCreateIfMissing(createIfMissing)) {
+ // use the hash index for prefix searches
+ BlockBasedTableConfig tfc = new BlockBasedTableConfig();
+ tfc.setIndexType(IndexType.kHashSearch);
+ options.setTableFormatConfig(tfc);
+ options.useCappedPrefixExtractor(RocksDbKey.KEY_SIZE);
+
+ String path = getRocksDbAbsoluteDir(config);
+ LOG.info("Opening RocksDB from {}", path);
+ db = RocksDB.open(options, path);
+ } catch (RocksDBException e) {
+ String message = "Error opening RockDB database";
+ LOG.error(message, e);
+ throw new MetricException(message, e);
+ }
+
+ // create thread to delete old metrics and metadata
+ Integer retentionHours =
Integer.parseInt(config.get(DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_HOURS).toString());
+ Integer deletionPeriod = 0;
+ if
(config.containsKey(DaemonConfig.STORM_ROCKSDB_METRIC_DELETION_PERIOD_HOURS)) {
+ deletionPeriod =
Integer.parseInt(config.get(DaemonConfig.STORM_ROCKSDB_METRIC_DELETION_PERIOD_HOURS).toString());
+ }
+ metricsCleaner = new MetricsCleaner(this, retentionHours,
deletionPeriod, failureMeter);
+
+ // create thread to process insertion of all metrics
+ metricsWriter = new RocksDbMetricsWriter(this, this.queue,
this.failureMeter);
+
+ int cacheCapacity =
Integer.parseInt(config.get(DaemonConfig.STORM_ROCKSDB_METADATA_STRING_CACHE_CAPACITY).toString());
+ StringMetadataCache.init(metricsWriter, cacheCapacity);
+ readOnlyStringMetadataCache =
StringMetadataCache.getReadOnlyStringMetadataCache();
+ metricsWriter.init(); // init the writer once the cache is setup
+
+ // start threads after metadata cache created
+ Thread thread = new Thread(metricsCleaner,
"RocksDbMetricsCleaner");
+ thread.setDaemon(true);
+ thread.start();
+
+ thread = new Thread(metricsWriter, "RocksDbMetricsWriter");
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ /**
+ * Implements configuration validation of Metrics Store, validates
storm configuration for Metrics Store.
+ *
+ * @param config Storm config to specify which store type, location of
store and creation policy
+ * @throws MetricException if there is a missing required
configuration or if the store does not exist but
+ * the config specifies not to create the store
+ */
+ private void validateConfig(Map config) throws MetricException {
+ if (!(config.containsKey(DaemonConfig.STORM_ROCKSDB_LOCATION))) {
+ throw new MetricException("Not a vaild RocksDB configuration -
Missing store location " + DaemonConfig.STORM_ROCKSDB_LOCATION);
+ }
+
+ if
(!(config.containsKey(DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING))) {
+ throw new MetricException("Not a vaild RocksDB configuration -
Does not specify creation policy "
+ + DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING);
+ }
+
+ // validate path defined
+ String storePath = getRocksDbAbsoluteDir(config);
+
+ boolean createIfMissing =
ObjectReader.getBoolean(config.get(DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING),
false);
+ if (!createIfMissing) {
+ if (!(new File(storePath).exists())) {
+ throw new MetricException("Configuration specifies not to
create a store but no store currently exists at " + storePath);
+ }
+ }
+
+ if
(!(config.containsKey(DaemonConfig.STORM_ROCKSDB_METADATA_STRING_CACHE_CAPACITY)))
{
+ throw new MetricException("Not a valid RocksDB configuration -
Missing metadata string cache size "
+ +
DaemonConfig.STORM_ROCKSDB_METADATA_STRING_CACHE_CAPACITY);
+ }
+
+ if
(!config.containsKey(DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_HOURS)) {
+ throw new MetricException("Not a valid RocksDB configuration -
Missing metric retention "
+ + DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_HOURS);
+ }
+ }
+
+ private String getRocksDbAbsoluteDir(Map conf) throws MetricException {
+ String storePath =
(String)conf.get(DaemonConfig.STORM_ROCKSDB_LOCATION);
+ if (storePath == null) {
+ throw new MetricException("Not a vaild RocksDB configuration -
Missing store location " + DaemonConfig.STORM_ROCKSDB_LOCATION);
+ } else {
+ if (new File(storePath).isAbsolute()) {
+ return storePath;
+ } else {
+ String stormHome = System.getProperty("storm.home");
--- End diff --
I did not find one. Can you point out which constant I should use (or
where to create one)?
---