DBOptions created by the OptionsFactory would be used to open RocksDB, however, 
if you just call 'RocksDBStateBackend#getDBOptions()' would not return the 
exactly wanted DBOptions but a newly created one. The private 'dbOptions' 
within `RocksDBKeyedStateBackend` is the one you wanted.

Best
Yun Tang
________________________________
From: Harshvardhan Agrawal <harshvardhan.ag...@gmail.com>
Sent: Friday, February 1, 2019 1:35
To: Yun Tang
Cc: user
Subject: Re: Writing a custom Rocksdb statistics collector

It looks like the DBOptions that are created by the OptionsFactory class are 
used for opening RocksDB.

And yes I missed the fact that DBOptions is not serializable. Thanks for 
pointing that out. I will go through the metrics exposed via Flink. But does 
this mean that there no good way of getting native RocksDB metrics in Flink?

On Wed, Jan 30, 2019 at 23:07 Yun Tang 
<myas...@live.com<mailto:myas...@live.com>> wrote:
Hi Harshvardhan

First of all, 'DBOptions' is not serializable, I think you cannot include it in 
the source constructor.

I also wondering whether the given `DBOptions` could query RocksDB's statistics 
since they are not the actual options to open RocksDB.

We have tried to report RocksDB's statistics each time when RocksDB 
state-backend snapshots, but this solution means you have to modify RocksDB 
state-backend's source code. By the way, Flink supports to report some native 
metrics[1], hope this could be helpful.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html#rocksdb-native-metrics<https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html#rocksdb-native-metrics>

Best
Yun Tang
________________________________
From: Harshvardhan Agrawal 
<harshvardhan.ag...@gmail.com<mailto:harshvardhan.ag...@gmail.com>>
Sent: Thursday, January 31, 2019 0:23
To: user
Subject: Writing a custom Rocksdb statistics collector


Hi,

I am currently trying to integrate RocksDB statistics in my pipeline.

The basic idea is that we want to pass RocksDB stats through the same pipeline 
that is doing our processing and write them to Elasticsearch so that we can 
visualize them in Kibana.

I have written a custom source function that takes in the DBOptions object from 
the stream environment and supply it to the source function which then uses 
this dboptions object to continuously query Rocksdb for metrics. Here's the 
code:

public class RocksDBStatsStreamRunner {

    public static void main(String[] args) throws IOException {

        final StreamExecutionEnvironment streamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
        RocksDBStateBackend rocksDBStateBackend = new 
RocksDBStateBackend("/tmp",true);
        rocksDBStateBackend.setOptions(new MyOptionsFactory());
        streamEnv.setStateBackend(rocksDBStateBackend);

        DBOptions dbOptions = 
((RocksDBStateBackend)streamEnv.getStateBackend()).getDbOptions();
        streamEnv.addSource(new RocksDBStatisticsSource(dbOptions));
    }
}


public RocksDBStatisticsSource(DBOptions dbOptions) {
    this(dbOptions, DEFAULT_SLEEP_TIME_MS);
}

public RocksDBStatisticsSource(DBOptions dbOptions, long waitTimeMs) {
    this.dbOptions = dbOptions;
    this.waitTimeMs = waitTimeMs;
}


@Override
public void stop() {
    this.isRunning = false;
}

@Override
public void run(SourceContext sourceContext) throws Exception {
    while(isRunning) {
        //create rocksdb statistics object
        //query rocksdb for statistics using the options field
        //sourceContext.collect(rocksdbStats object)
        //sleep
    }
}

@Override
public void cancel() {
    this.isRunning = false;
}

I am assuming that we will get a separate RocksDB options object for each of 
the slots. Is this a good way to approach this problem? Do you think this will 
work?

Thanks in advance! :)
--
Regards,
Harshvardhan Agrawal
--
Regards,
Harshvardhan

Reply via email to