Hi

The option of 'setCacheIndexAndFilterBlocks' is used to ensure we could manage 
the memory usage of RocksDB, could you share logs or more descriptions why 
setCacheIndexAndFilterBlocks seems to make the hash index not work properly?

I guess this might due to the index and filter is more likely to be pop out 
with the competition of data blocks [1], although Flink has tried its best to 
minimize the regression. Please consider to increase the total block cache size 
or decrease state.backend.rocksdb.memory.write-buffer-ratio [2]

[1] 
https://github.com/facebook/rocksdb/wiki/Block-Cache#caching-index-filter-and-compression-dictionary-blocks
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/state_backends.html#memory-management

Best
Yun Tang
________________________________
From: ירון שני <yaron.sh...@gmail.com>
Sent: Tuesday, September 29, 2020 17:49
To: Yun Tang <myas...@live.com>
Cc: user@flink.apache.org <user@flink.apache.org>
Subject: Re: Poor performance with large keys using RocksDB and MapState

Thanks Yun!,
I used this option, and it greatly helped

2:44<https://xmcyber.slack.com/archives/DP2KLMWUX/p1600947862026600>

val be = new RocksDBStateBackend("file:///tmp")class MyConfig extends 
DefaultConfigurableOptionsFactory {  override def 
createColumnOptions(currentOptions: ColumnFamilyOptions, handlesToClose: 
util.Collection[AutoCloseable]): ColumnFamilyOptions = {
    super.createColumnOptions(currentOptions, 
handlesToClose).optimizeForPointLookup(2000)
  }
}
be.setRocksDBOptions(new MyConfig)
be.getMemoryConfiguration.setUseManagedMemory(false)

But now I cant use the RocksDBSharedResources because of 
setCacheIndexAndFilterBlocks seems to make the hash index not work properly and 
the performance is bad again.
Only when using  be.getMemoryConfiguration.setUseManagedMemory(false) and 
skipping setCacheIndexAndFilterBlocks , only then its working :(





On Fri, Sep 25, 2020 at 9:56 AM Yun Tang 
<myas...@live.com<mailto:myas...@live.com>> wrote:
Hi

If you want to improve the performance of point lookup, you could try to use 
additional hash index. This feature needs to pass a prefix extractor, however, 
original interface is not exposed out directly in java API.

You could try to call 
columnFamilyOptions.optimizeForPointLookup(blockCacheSizeMb) and it would use 
NoopTransform prefix extractor by default[1].
Please also consider to use this feature after Flink-1.10.2 due to RocksDB 
internal bug [2].

[1] 
https://github.com/dataArtisans/frocksdb/blob/c724d41fab7f9f09f9676dfccc6d210a191da4d6/options/options.cc#L477
[2] https://issues.apache.org/jira/browse/FLINK-17800

Best
Yun Tang


________________________________
From: ירון שני <yaron.sh...@gmail.com<mailto:yaron.sh...@gmail.com>>
Sent: Wednesday, September 23, 2020 23:56
To: user@flink.apache.org<mailto:user@flink.apache.org> 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Poor performance with large keys using RocksDB and MapState

Hello,
I have a poor throughput issue, and I think I managed to reproduce it using the 
following code:

val conf = new Configuration()
conf.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.ofMebiBytes(6 * 
1000))
conf.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(8 * 
1000))
conf.set(TaskManagerOptions.NETWORK_MEMORY_MIN, MemorySize.ofMebiBytes(256))
conf.set(RocksDBConfigurableOptions.BLOCK_SIZE, new MemorySize(8 * 1000))

val be = new RocksDBStateBackend("file:///tmp")
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
  .setStateBackend(be)


env.setParallelism(3)
env.getConfig.enableObjectReuse()

val r = new scala.util.Random(31)
    val randStr = r.nextString(4992)
    val s = env.fromElements(1).process((value: Int, ctx: 
_root_.org.apache.flink.streaming.api.functions.ProcessFunction[Int, 
_root_.scala.Predef.String]#Context, out: 
_root_.org.apache.flink.util.Collector[_root_.scala.Predef.String]) => {
      for (a <- 1 to 1000 * 1000 * 10) {
        out.collect( randStr + r.nextString(8) )

      }
    }).keyBy(a=>a).process(new ProcessFunction[String, String] {
      private var someState: MapState[String, String] = _

      override def open(parameters: Configuration): Unit = {
        someState = getRuntimeContext.getMapState(
          new MapStateDescriptor[String, String]("someState", 
createTypeInformation[String], createTypeInformation[String])
        )
      }

      override def processElement(value: _root_.scala.Predef.String, ctx: 
_root_.org.apache.flink.streaming.api.functions.ProcessFunction[_root_.scala.Predef.String,
 _root_.scala.Predef.String]#Context, out: 
_root_.org.apache.flink.util.Collector[_root_.scala.Predef.String]): Unit = {
        if(!someState.contains(value)) {
          someState.put(value, value)
        }
      }
})

env.execute()

This has really poor throughput.
Now changing
out.collect( randStr + r.nextString(8) )

to
out.collect( r.nextString(8) + randStr)
Solves the issue.
Is there any way easy to fix this?
I tried to use hash index, but it required rocks db option called "prefix 
extractor" which I don't know how to fill yet, and no idea if it will fix it.
If anyone encountered that before, I would really use some advice/help. Thanks!







Reply via email to