Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2504#discussion_r162544306
  
    --- Diff: 
storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java
 ---
    @@ -0,0 +1,323 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.storm.metricstore.rocksdb;
    +
    +import com.codahale.metrics.Meter;
    +
    +import java.util.ArrayList;
    +import java.util.HashSet;
    +import java.util.ListIterator;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeMap;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.ThreadLocalRandom;
    +
    +import org.apache.http.annotation.NotThreadSafe;
    +import org.apache.storm.metricstore.AggLevel;
    +import org.apache.storm.metricstore.Metric;
    +import org.apache.storm.metricstore.MetricException;
    +import org.rocksdb.FlushOptions;
    +import org.rocksdb.RocksDBException;
    +import org.rocksdb.WriteBatch;
    +import org.rocksdb.WriteOptions;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Class designed to perform all metrics inserts into RocksDB.  Metrics 
are processed from a blocking queue.  Inserts
    + * to RocksDB are done using a single thread to simplify design (such as 
looking up existing metric data for aggregation,
    + * and fetching/evicting metadata from the cache).
    + * </P>
    + * A writable LRU StringMetadataCache is used to minimize looking up 
metadata string Ids.  As entries are added to the full cache, older
    + * entries are evicted from the cache and need to be written to the 
database.  This happens as the handleEvictedMetadata()
    + * method callback.
    + * </P>
    + * The following issues would need to be addressed to implement a 
multithreaded metrics writer:
    + * <ul>
    + *     <li>Generation of unique unused IDs for new metadata strings needs 
to be thread safe.</li>
    + *     <li>Ensuring newly created metadata strings are seen by all 
threads.</li>
    + *     <li>Maintaining a properly cached state of metadata for multiple 
writers.  The current LRU cache
    + *     evicts data as new metadata is added.</li>
    + *     <li>Processing the aggregation of a metric requires fetching and 
updating previous aggregates.  A multithreaded
    + *     design would need to ensure two metrics were not updating an 
aggregated metric at the same time.</li>
    + *     <li>Investigate performance of multiple threads inserting into 
RocksDB versus a single ordered insert.</li>
    + * </ul>
    + */
    +@NotThreadSafe
    +public class RocksDbMetricsWriter implements Runnable, AutoCloseable {
    +    private static final Logger LOG = 
LoggerFactory.getLogger(RocksDbMetricsWriter.class);
    +    private RocksDbStore store;
    +    private BlockingQueue queue;
    +    private WritableStringMetadataCache stringMetadataCache;
    +    private Set<Integer> unusedIds = new HashSet<>();
    +    private TreeMap<RocksDbKey, RocksDbValue> insertBatch = new 
TreeMap<>(); // RocksDB should insert in sorted key order
    +    private WriteOptions writeOpts = new WriteOptions();
    +    private volatile boolean shutdown = false;
    +    private Meter failureMeter;
    +    private ArrayList<AggLevel> aggBuckets = new ArrayList<>();
    +
    +    /**
    +     * Constructor for the RocksDbMetricsWriter.
    +     *
    +     * @param store   The RocksDB store
    +     * @param queue   The queue to receive metrics for insertion
    +     */
    +    RocksDbMetricsWriter(RocksDbStore store, BlockingQueue queue, Meter 
failureMeter)  {
    +        this.store = store;
    +        this.queue = queue;
    +        this.failureMeter = failureMeter;
    +
    +        aggBuckets.add(AggLevel.AGG_LEVEL_1_MIN);
    +        aggBuckets.add(AggLevel.AGG_LEVEL_10_MIN);
    +        aggBuckets.add(AggLevel.AGG_LEVEL_60_MIN);
    +    }
    +
    +    /**
    +     * Init routine called once the Metadata cache has been created.
    +     *
    +     * @throws MetricException  on cache error
    +     */
    +    void init() throws MetricException {
    +        this.stringMetadataCache = 
StringMetadataCache.getWritableStringMetadataCache();
    +    }
    +
    +    /**
    +     * Run routine to wait for metrics on a queue and insert into RocksDB.
    +     */
    +    @Override
    +    public void run() {
    +        while (true) {
    +            if (shutdown) {
    --- End diff --
    
    Same here.


---

Reply via email to