Thanks Yun!,
I used this option, and it greatly helped

2:44 <https://xmcyber.slack.com/archives/DP2KLMWUX/p1600947862026600>

val be = new RocksDBStateBackend("file:///tmp")class MyConfig extends
DefaultConfigurableOptionsFactory {  override def
createColumnOptions(currentOptions: ColumnFamilyOptions,
handlesToClose: util.Collection[AutoCloseable]): ColumnFamilyOptions =
{
    super.createColumnOptions(currentOptions,
handlesToClose).optimizeForPointLookup(2000)
  }
}
be.setRocksDBOptions(new MyConfig)
be.getMemoryConfiguration.setUseManagedMemory(false)


But now I cant use the RocksDBSharedResources because of
setCacheIndexAndFilterBlocks seems to make the hash index not work properly
and the performance is bad again.
Only when using  be.getMemoryConfiguration.setUseManagedMemory(false) and
skipping setCacheIndexAndFilterBlocks , only then its working :(





On Fri, Sep 25, 2020 at 9:56 AM Yun Tang <myas...@live.com> wrote:

> Hi
>
> If you want to improve the performance of point lookup, you could try to
> use additional hash index. This feature needs to pass a prefix extractor,
> however, original interface is not exposed out directly in java API.
>
> You could try to call
> columnFamilyOptions.optimizeForPointLookup(blockCacheSizeMb) and it would
> use NoopTransform prefix extractor by default[1].
> Please also consider to use this feature after Flink-1.10.2 due to RocksDB
> internal bug [2].
>
> [1]
> https://github.com/dataArtisans/frocksdb/blob/c724d41fab7f9f09f9676dfccc6d210a191da4d6/options/options.cc#L477
> [2] https://issues.apache.org/jira/browse/FLINK-17800
>
> Best
> Yun Tang
>
>
> ------------------------------
> *From:* ירון שני <yaron.sh...@gmail.com>
> *Sent:* Wednesday, September 23, 2020 23:56
> *To:* user@flink.apache.org <user@flink.apache.org>
> *Subject:* Poor performance with large keys using RocksDB and MapState
>
> Hello,
> I have a poor throughput issue, and I think I managed to reproduce it
> using the following code:
>
> val conf = new Configuration()
> conf.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.ofMebiBytes(6 * 
> 1000))
> conf.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(8 * 
> 1000))
> conf.set(TaskManagerOptions.NETWORK_MEMORY_MIN, MemorySize.ofMebiBytes(256))
> conf.set(RocksDBConfigurableOptions.BLOCK_SIZE, new MemorySize(8 * 1000))
>
> val be = new RocksDBStateBackend("file:///tmp")
> val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
>   .setStateBackend(be)
>
> env.setParallelism(3)
> env.getConfig.enableObjectReuse()
>
> val r = new scala.util.Random(31)
>     val randStr = r.nextString(4992)
>     val s = env.fromElements(1).process((value: Int, ctx: 
> _root_.org.apache.flink.streaming.api.functions.ProcessFunction[Int, 
> _root_.scala.Predef.String]#Context, out: 
> _root_.org.apache.flink.util.Collector[_root_.scala.Predef.String]) => {
>       for (a <- 1 to 1000 * 1000 * 10) {
>         out.collect( randStr + r.nextString(8) )
>
>       }
>     }).keyBy(a=>a).process(new ProcessFunction[String, String] {
>       private var someState: MapState[String, String] = _
>
>       override def open(parameters: Configuration): Unit = {
>         someState = getRuntimeContext.getMapState(
>           new MapStateDescriptor[String, String]("someState", 
> createTypeInformation[String], createTypeInformation[String])
>         )
>       }
>
>       override def processElement(value: _root_.scala.Predef.String, ctx: 
> _root_.org.apache.flink.streaming.api.functions.ProcessFunction[_root_.scala.Predef.String,
>  _root_.scala.Predef.String]#Context, out: 
> _root_.org.apache.flink.util.Collector[_root_.scala.Predef.String]): Unit = {
>         if(!someState.contains(value)) {
>           someState.put(value, value)
>         }
>       }
> })
>
> env.execute()
>
> This has really poor throughput.
> Now changing
> out.collect( randStr + r.nextString(8) )
>
> to
> out.collect( r.nextString(8) + randStr)
> Solves the issue.
> Is there any way easy to fix this?
> I tried to use hash index, but it required rocks db option called "prefix
> extractor" which I don't know how to fill yet, and no idea if it will fix
> it.
> If anyone encountered that before, I would really use some advice/help.
> Thanks!
>
>
>
>
>
>
>
>

Reply via email to