[ 
https://issues.apache.org/jira/browse/FLINK-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15923530#comment-15923530
 ] 

shijinkui commented on FLINK-5756:
----------------------------------

[~StephanEwen] Thank for your reply. [~SyinchwunLeo] Test the mini-benchmark 
please.
FLINK-5715 is nice.

> When there are many values under the same key in ListState, 
> RocksDBStateBackend performances poor
> -------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-5756
>                 URL: https://issues.apache.org/jira/browse/FLINK-5756
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.2.0
>         Environment: CentOS 7.2
>            Reporter: Syinchwun Leo
>
> When using RocksDB as the StateBackend, if there are many values under the 
> same key in ListState, the windowState.get() operator performances very poor. 
> I also the the RocksDB using version 4.11.2, the performance is also very 
> poor. The problem is likely to related to RocksDB itself's get() operator 
> after using merge(). The problem may influences the window operation's 
> performance when the size is very large using ListState. I try to merge 50000 
> values under the same key in RocksDB, It costs 120 seconds to execute get() 
> operation.
> ///////////////////////////////////////////////////////////////////////////////
> The flink's code is as follows:    
> {code}
> class SEventSource extends RichSourceFunction [SEvent] {
>   private var count = 0L
>   private val alphabet = 
> "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321"
>   override def run(sourceContext: SourceContext[SEvent]): Unit = {
>     while (true) {
>       for (i <- 0 until 5000) {
>         sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1))
>         count += 1L
>       }
>       Thread.sleep(1000)
>     }
>   }
> }
> env.addSource(new SEventSource)
>       .assignTimestampsAndWatermarks(new 
> AssignerWithPeriodicWatermarks[SEvent] {
>         override def getCurrentWatermark: Watermark = {
>           new Watermark(System.currentTimeMillis())
>         }
>         override def extractTimestamp(t: SEvent, l: Long): Long = {
>           System.currentTimeMillis()
>         }
>       })
>       .keyBy(0)
>       .window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(2)))
>       .apply(new WindowStatistic)
>       .map(x => (System.currentTimeMillis(), x))
>       .print()
> {code}
> ////////////////////////////////////
> The RocksDB Test code:    
> {code}
> val stringAppendOperator = new StringAppendOperator
>     val options = new Options()
>     options.setCompactionStyle(CompactionStyle.LEVEL)
>       .setCompressionType(CompressionType.SNAPPY_COMPRESSION)
>       .setLevelCompactionDynamicLevelBytes(true)
>       .setIncreaseParallelism(4)
>       .setUseFsync(true)
>       .setMaxOpenFiles(-1)
>       .setCreateIfMissing(true)
>       .setMergeOperator(stringAppendOperator)
>     val write_options = new WriteOptions
>     write_options.setSync(false)
>     val rocksDB = RocksDB.open(options, "/******/Data/")
>     val key = "key"
>     val value = 
> "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321"
>     val beginmerge = System.currentTimeMillis()
>     for(i <- 0 to 50000) {
>       rocksDB.merge(key.getBytes(), ("s"+ i + value).getBytes())
>       //rocksDB.put(key.getBytes, value.getBytes)
>     }
>     println("finish")
>     val begin = System.currentTimeMillis()
>     rocksDB.get(key.getBytes)
>     val end = System.currentTimeMillis()
>     println("merge cost:" + (begin - beginmerge))
>     println("Time consuming:" + (end - begin))
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to