Hi,

I am currently trying to integrate RocksDB statistics in my pipeline.

The basic idea is that we want to pass RocksDB stats through the same
pipeline that is doing our processing and write them to Elasticsearch
so that we can visualize them in Kibana.

I have written a custom source function that takes in the DBOptions
object from the stream environment and supply it to the source
function which then uses this dboptions object to continuously query
Rocksdb for metrics. Here's the code:

public class RocksDBStatsStreamRunner {


    public static void main(String[] args) throws IOException {

        final StreamExecutionEnvironment streamEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
        RocksDBStateBackend rocksDBStateBackend = new
RocksDBStateBackend("/tmp",true);
        rocksDBStateBackend.setOptions(new MyOptionsFactory());
        streamEnv.setStateBackend(rocksDBStateBackend);

        DBOptions dbOptions =
((RocksDBStateBackend)streamEnv.getStateBackend()).getDbOptions();
        streamEnv.addSource(new RocksDBStatisticsSource(dbOptions));
    }
}


public RocksDBStatisticsSource(DBOptions dbOptions) {
    this(dbOptions, DEFAULT_SLEEP_TIME_MS);
}

public RocksDBStatisticsSource(DBOptions dbOptions, long waitTimeMs) {
    this.dbOptions = dbOptions;
    this.waitTimeMs = waitTimeMs;
}


@Override
public void stop() {
    this.isRunning = false;
}

@Override
public void run(SourceContext sourceContext) throws Exception {
    while(isRunning) {
        //create rocksdb statistics object
        //query rocksdb for statistics using the options field
        //sourceContext.collect(rocksdbStats object)
        //sleep
    }
}

@Override
public void cancel() {
    this.isRunning = false;
}

I am assuming that we will get a separate RocksDB options object for each
of the slots. Is this a good way to approach this problem? Do you think
this will work?

Thanks in advance! :)
-- 

*Regards,Harshvardhan Agrawal*

Reply via email to