[ 
https://issues.apache.org/jira/browse/FLINK-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15922909#comment-15922909
 ] 

Stephan Ewen commented on FLINK-5756:
-------------------------------------

Thanks for opening this and sharing the test results.
I agree that the performance of RocksDB is not optimal and that we would like 
to get better performance out of the state backend. In general, RocksDB is 
heavily optimized for writes and for small values. Larger values (as you get 
with the merge) perform very bad.

Here are a few things we can do and have already started doing:

*Improve the other state backends*

  - We are currently making the in-memory state backend (object data) much 
stronger, with async snapshots (see FLINK-5715 )
  - It makes sense to eventually build an own state backend that operators on 
serialized data with managed memory

*Optimize the RocksDB State Backend*

  - We can try an avoid RocksDB's merge operation and instead use range 
iterators for ListState.
  - Quick benchmark of the same task in that approach gives *91ms* insert time 
and *35ms* get() time. That looks like worth exploring.


*A tip to improve your benchmark*
  - Try to move all string operations out of the test loop. Prepare the bytes 
before and then call the RocksDB functions.
  - I redid the benchmark with the code below and it took *20* seconds to get 
the result of a merge. Still a lot of time...


*Code for range-iterator mini-benchmark*
{code}
                final File rocksDir = new File("/tmp/rdb");
                FileUtils.deleteDirectory(rocksDir);

                final Options options = new Options()
                                .setCompactionStyle(CompactionStyle.LEVEL)
                                .setLevelCompactionDynamicLevelBytes(true)
                                .setIncreaseParallelism(4)
                                .setUseFsync(false)
                                .setMaxOpenFiles(-1)
                                .setAllowOsBuffer(true)
                                .setDisableDataSync(true)
                                .setCreateIfMissing(true)
                                .setMergeOperator(new StringAppendOperator());

                final WriteOptions write_options = new WriteOptions()
                                .setSync(false)
                                .setDisableWAL(true);

                final RocksDB rocksDB = RocksDB.open(options, 
rocksDir.getAbsolutePath());

                final String key = "key";
                final String value = 
"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";

                final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
                final byte[] valueBytes = 
value.getBytes(StandardCharsets.UTF_8);

                final byte[] keyTemplate = Arrays.copyOf(keyBytes, 
keyBytes.length + 4);

                final Unsafe unsafe = MemoryUtils.UNSAFE;
                final long offset = unsafe.arrayBaseOffset(byte[].class) + 
keyTemplate.length - 4;

                final int num = 50000;
                System.out.println("begin insert");

                final long beginInsert = System.nanoTime();
                for (int i = 0; i < num; i++) {
                        unsafe.putInt(keyTemplate, offset, i);
                        rocksDB.put(write_options, keyTemplate, valueBytes);
                }
                final long endInsert = System.nanoTime();
                System.out.println("end insert - duration: " + ((endInsert - 
beginInsert) / 1_000_000) + " ms");

                final byte[] resultHolder = new byte[num * valueBytes.length];

                final long beginGet = System.nanoTime();

                final RocksIterator iterator = rocksDB.newIterator();
                int pos = 0;

                // seek to start
                unsafe.putInt(keyTemplate, offset, 0);
                iterator.seek(keyTemplate);

                // mark end
                unsafe.putInt(keyTemplate, offset, -1);

                // iterate
                while (iterator.isValid()) {
                        byte[] currKey = iterator.key();
                        if (sameKey(keyBytes, currKey)) {
                                byte[] currValue = iterator.value();
                                System.arraycopy(currValue, 0, resultHolder, 
pos, currValue.length);
                                pos += currValue.length;
                                iterator.next();
                        }
                        else {
                                break;
                        }
                }
                
                final long endGet = System.nanoTime();

                System.out.println("end get - duration: " + ((endGet - 
beginGet) / 1_000_000) + " ms");
{code}

> When there are many values under the same key in ListState, 
> RocksDBStateBackend performances poor
> -------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-5756
>                 URL: https://issues.apache.org/jira/browse/FLINK-5756
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.2.0
>         Environment: CentOS 7.2
>            Reporter: Syinchwun Leo
>
> When using RocksDB as the StateBackend, if there are many values under the 
> same key in ListState, the windowState.get() operator performances very poor. 
> I also the the RocksDB using version 4.11.2, the performance is also very 
> poor. The problem is likely to related to RocksDB itself's get() operator 
> after using merge(). The problem may influences the window operation's 
> performance when the size is very large using ListState. I try to merge 50000 
> values under the same key in RocksDB, It costs 120 seconds to execute get() 
> operation.
> ///////////////////////////////////////////////////////////////////////////////
> The flink's code is as follows:    
> {code}
> class SEventSource extends RichSourceFunction [SEvent] {
>   private var count = 0L
>   private val alphabet = 
> "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321"
>   override def run(sourceContext: SourceContext[SEvent]): Unit = {
>     while (true) {
>       for (i <- 0 until 5000) {
>         sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1))
>         count += 1L
>       }
>       Thread.sleep(1000)
>     }
>   }
> }
> env.addSource(new SEventSource)
>       .assignTimestampsAndWatermarks(new 
> AssignerWithPeriodicWatermarks[SEvent] {
>         override def getCurrentWatermark: Watermark = {
>           new Watermark(System.currentTimeMillis())
>         }
>         override def extractTimestamp(t: SEvent, l: Long): Long = {
>           System.currentTimeMillis()
>         }
>       })
>       .keyBy(0)
>       .window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(2)))
>       .apply(new WindowStatistic)
>       .map(x => (System.currentTimeMillis(), x))
>       .print()
> {code}
> ////////////////////////////////////
> The RocksDB Test code:    
> {code}
> val stringAppendOperator = new StringAppendOperator
>     val options = new Options()
>     options.setCompactionStyle(CompactionStyle.LEVEL)
>       .setCompressionType(CompressionType.SNAPPY_COMPRESSION)
>       .setLevelCompactionDynamicLevelBytes(true)
>       .setIncreaseParallelism(4)
>       .setUseFsync(true)
>       .setMaxOpenFiles(-1)
>       .setCreateIfMissing(true)
>       .setMergeOperator(stringAppendOperator)
>     val write_options = new WriteOptions
>     write_options.setSync(false)
>     val rocksDB = RocksDB.open(options, "/******/Data/")
>     val key = "key"
>     val value = 
> "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321"
>     val beginmerge = System.currentTimeMillis()
>     for(i <- 0 to 50000) {
>       rocksDB.merge(key.getBytes(), ("s"+ i + value).getBytes())
>       //rocksDB.put(key.getBytes, value.getBytes)
>     }
>     println("finish")
>     val begin = System.currentTimeMillis()
>     rocksDB.get(key.getBytes)
>     val end = System.currentTimeMillis()
>     println("merge cost:" + (begin - beginmerge))
>     println("Time consuming:" + (end - begin))
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to