Hi,

We are using flink version 1.10.1
The task manager memory 16GB
The number of slots is 32 but the job parallelism is 1.
We used the default configuration for rocksdb.
We checked the disk speed on the machine running the task manager: Write
300MB and read 1GB

BR,
Nick

‫בתאריך יום ג׳, 16 ביוני 2020 ב-12:12 מאת ‪Yun Tang‬‏ <‪myas...@live.com
‬‏>:‬

> Hi Nick
>
> As you might know, RocksDB suffers not so good performance for
> iterator-like operations due to it needs to merge sort for multi levels. [1]
>
> Unfortunately, rocksDBMapState.isEmpty() needs to call iterator and seek
> operations over rocksDB [2], and rocksDBMapState.clear() needs to iterator
> over state and remove entry [3].
> However, even these operations behaves not so good, I don't think they
> would behave extremely bad in general case. From our experience on SSD, the
> latency of seek should be less than 100us
> and could go up to hundreds of us, did you use SSD disk?
>
>    1. What is the Flink version, taskmanager memory, number of slots and
>    RocksDB related configurations?
>    2. Have you checked the IOPS, disk util for those machines which
>    containing task manager running RocksDB?
>
>
> [1] https://github.com/facebook/rocksdb/wiki/Iterator-Implementation
> [2]
> https://github.com/apache/flink/blob/efd497410ced3386b955a92b731a8e758223045f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java#L241
> [3]
> https://github.com/apache/flink/blob/efd497410ced3386b955a92b731a8e758223045f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java#L254
>
> Best
> Yun Tang
>
> ------------------------------
> *From:* nick toker <nick.toker....@gmail.com>
> *Sent:* Tuesday, June 16, 2020 15:35
> *To:* user@flink.apache.org <user@flink.apache.org>
> *Subject:* MapState bad performance
>
> Hello,
>
> We wrote a very simple streaming pipeline containing:
> 1. Kafka consumer
> 2. Process function
> 3. Kafka producer
>
> The code of the process function is listed below:
>
> private transient MapState<String, Object> testMapState;
>
> @Override
>     public void processElement(Map<String, Object> value, Context ctx, 
> Collector<Map<String, Object>> out) throws Exception {
>
>             if (testMapState.isEmpty()) {
>
>                 testMapState.putAll(value);
>
>                 out.collect(value);
>
>                 testMapState.clear();
>             }
>         }
>
> We faced very bad performance and then we made some tests using jprofiler.
> Using jprofiler, we saw that the hot spots are 2 functions of the MapState:
> 1. isEmpty() - around 7 ms
> 2. clear() - around 4 ms
>
> We had to change and use ValueState instead.
>
> Are we using the MapState in the correct way or are we doing something
> wrong ?
> Is this behaviour expected because flink  recommendations are to use
> MapState and NOT ValueState ?
>
> BR,
> Nick
>

Reply via email to