Re: MapState bad performance

2020-06-20 Thread Congxian Qiu
Hi Nick

Sorry for the late jump in.

Just wondering why you call putAll of RocksDBMapState and has
RocksDBMapState#clear() followed.  seems the state will always be empty
after the process.

Best,
Congxian


Yun Tang  于2020年6月16日周二 下午7:42写道:

> Hi Nick
>
> From my experience, it's not easy to tune this without code to reproduce.
> Could you please give code with fake source to reproduce so that we could
> help you?
>
> If CPU usage is 100% at rocksDB related methods, it's might be due to we
> access RocksDB too often . If the CPU usage is not 100% while disk util is
> 100%, it should be
> we meet the performance limit of disk.
>
> BTW, if you have 16GB memory TM with 32 slots, it would only give about
> 150MB managed memory [1][2] for RocksDB, which looks like a bit small.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/memory/mem_setup.html#managed-memory
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/memory/mem_tuning.html#rocksdb-state-backend
>
> Best
> Yun Tang
>
>
> --
> *From:* nick toker 
> *Sent:* Tuesday, June 16, 2020 18:36
> *To:* Yun Tang 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: MapState bad performance
>
> Hi,
>
> We are using flink version 1.10.1
> The task manager memory 16GB
> The number of slots is 32 but the job parallelism is 1.
> We used the default configuration for rocksdb.
> We checked the disk speed on the machine running the task manager: Write
> 300MB and read 1GB
>
> BR,
> Nick
>
> ‫בתאריך יום ג׳, 16 ביוני 2020 ב-12:12 מאת ‪Yun Tang‬‏ <‪myas...@live.com
> ‬‏>:‬
>
> Hi Nick
>
> As you might know, RocksDB suffers not so good performance for
> iterator-like operations due to it needs to merge sort for multi levels. [1]
>
> Unfortunately, rocksDBMapState.isEmpty() needs to call iterator and seek
> operations over rocksDB [2], and rocksDBMapState.clear() needs to iterator
> over state and remove entry [3].
> However, even these operations behaves not so good, I don't think they
> would behave extremely bad in general case. From our experience on SSD, the
> latency of seek should be less than 100us
> and could go up to hundreds of us, did you use SSD disk?
>
>1. What is the Flink version, taskmanager memory, number of slots and
>RocksDB related configurations?
>2. Have you checked the IOPS, disk util for those machines which
>containing task manager running RocksDB?
>
>
> [1] https://github.com/facebook/rocksdb/wiki/Iterator-Implementation
> [2]
> https://github.com/apache/flink/blob/efd497410ced3386b955a92b731a8e758223045f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java#L241
> [3]
> https://github.com/apache/flink/blob/efd497410ced3386b955a92b731a8e758223045f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java#L254
>
> Best
> Yun Tang
>
> --
> *From:* nick toker 
> *Sent:* Tuesday, June 16, 2020 15:35
> *To:* user@flink.apache.org 
> *Subject:* MapState bad performance
>
> Hello,
>
> We wrote a very simple streaming pipeline containing:
> 1. Kafka consumer
> 2. Process function
> 3. Kafka producer
>
> The code of the process function is listed below:
>
> private transient MapState testMapState;
>
> @Override
> public void processElement(Map value, Context ctx, 
> Collector> out) throws Exception {
>
> if (testMapState.isEmpty()) {
>
> testMapState.putAll(value);
>
> out.collect(value);
>
> testMapState.clear();
> }
> }
>
> We faced very bad performance and then we made some tests using jprofiler.
> Using jprofiler, we saw that the hot spots are 2 functions of the MapState:
> 1. isEmpty() - around 7 ms
> 2. clear() - around 4 ms
>
> We had to change and use ValueState instead.
>
> Are we using the MapState in the correct way or are we doing something
> wrong ?
> Is this behaviour expected because flink  recommendations are to use
> MapState and NOT ValueState ?
>
> BR,
> Nick
>
>


Re: MapState bad performance

2020-06-16 Thread Yun Tang
Hi Nick

From my experience, it's not easy to tune this without code to reproduce. Could 
you please give code with fake source to reproduce so that we could help you?

If CPU usage is 100% at rocksDB related methods, it's might be due to we access 
RocksDB too often . If the CPU usage is not 100% while disk util is 100%, it 
should be
we meet the performance limit of disk.

BTW, if you have 16GB memory TM with 32 slots, it would only give about 150MB 
managed memory [1][2] for RocksDB, which looks like a bit small.

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/memory/mem_setup.html#managed-memory
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/memory/mem_tuning.html#rocksdb-state-backend

Best
Yun Tang



From: nick toker 
Sent: Tuesday, June 16, 2020 18:36
To: Yun Tang 
Cc: user@flink.apache.org 
Subject: Re: MapState bad performance

Hi,

We are using flink version 1.10.1
The task manager memory 16GB
The number of slots is 32 but the job parallelism is 1.
We used the default configuration for rocksdb.
We checked the disk speed on the machine running the task manager: Write 300MB 
and read 1GB

BR,
Nick

‫בתאריך יום ג׳, 16 ביוני 2020 ב-12:12 מאת ‪Yun Tang‬‏ 
<‪myas...@live.com<mailto:myas...@live.com>‬‏>:‬
Hi Nick

As you might know, RocksDB suffers not so good performance for iterator-like 
operations due to it needs to merge sort for multi levels. [1]

Unfortunately, rocksDBMapState.isEmpty() needs to call iterator and seek 
operations over rocksDB [2], and rocksDBMapState.clear() needs to iterator over 
state and remove entry [3].
However, even these operations behaves not so good, I don't think they would 
behave extremely bad in general case. From our experience on SSD, the latency 
of seek should be less than 100us
and could go up to hundreds of us, did you use SSD disk?

  1.  What is the Flink version, taskmanager memory, number of slots and 
RocksDB related configurations?
  2.  Have you checked the IOPS, disk util for those machines which containing 
task manager running RocksDB?

[1] https://github.com/facebook/rocksdb/wiki/Iterator-Implementation
[2] 
https://github.com/apache/flink/blob/efd497410ced3386b955a92b731a8e758223045f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java#L241
[3] 
https://github.com/apache/flink/blob/efd497410ced3386b955a92b731a8e758223045f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java#L254

Best
Yun Tang


From: nick toker mailto:nick.toker@gmail.com>>
Sent: Tuesday, June 16, 2020 15:35
To: user@flink.apache.org<mailto:user@flink.apache.org> 
mailto:user@flink.apache.org>>
Subject: MapState bad performance

Hello,

We wrote a very simple streaming pipeline containing:
1. Kafka consumer
2. Process function
3. Kafka producer

The code of the process function is listed below:


private transient MapState testMapState;

@Override
public void processElement(Map value, Context ctx, 
Collector> out) throws Exception {

if (testMapState.isEmpty()) {

testMapState.putAll(value);

out.collect(value);

testMapState.clear();
}
}

We faced very bad performance and then we made some tests using jprofiler.
Using jprofiler, we saw that the hot spots are 2 functions of the MapState:
1. isEmpty() - around 7 ms
2. clear() - around 4 ms

We had to change and use ValueState instead.

Are we using the MapState in the correct way or are we doing something wrong ?
Is this behaviour expected because flink  recommendations are to use MapState 
and NOT ValueState ?

BR,
Nick


Re: MapState bad performance

2020-06-16 Thread nick toker
Hi,

We are using flink version 1.10.1
The task manager memory 16GB
The number of slots is 32 but the job parallelism is 1.
We used the default configuration for rocksdb.
We checked the disk speed on the machine running the task manager: Write
300MB and read 1GB

BR,
Nick

‫בתאריך יום ג׳, 16 ביוני 2020 ב-12:12 מאת ‪Yun Tang‬‏ <‪myas...@live.com
‬‏>:‬

> Hi Nick
>
> As you might know, RocksDB suffers not so good performance for
> iterator-like operations due to it needs to merge sort for multi levels. [1]
>
> Unfortunately, rocksDBMapState.isEmpty() needs to call iterator and seek
> operations over rocksDB [2], and rocksDBMapState.clear() needs to iterator
> over state and remove entry [3].
> However, even these operations behaves not so good, I don't think they
> would behave extremely bad in general case. From our experience on SSD, the
> latency of seek should be less than 100us
> and could go up to hundreds of us, did you use SSD disk?
>
>1. What is the Flink version, taskmanager memory, number of slots and
>RocksDB related configurations?
>2. Have you checked the IOPS, disk util for those machines which
>containing task manager running RocksDB?
>
>
> [1] https://github.com/facebook/rocksdb/wiki/Iterator-Implementation
> [2]
> https://github.com/apache/flink/blob/efd497410ced3386b955a92b731a8e758223045f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java#L241
> [3]
> https://github.com/apache/flink/blob/efd497410ced3386b955a92b731a8e758223045f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java#L254
>
> Best
> Yun Tang
>
> --
> *From:* nick toker 
> *Sent:* Tuesday, June 16, 2020 15:35
> *To:* user@flink.apache.org 
> *Subject:* MapState bad performance
>
> Hello,
>
> We wrote a very simple streaming pipeline containing:
> 1. Kafka consumer
> 2. Process function
> 3. Kafka producer
>
> The code of the process function is listed below:
>
> private transient MapState testMapState;
>
> @Override
> public void processElement(Map value, Context ctx, 
> Collector> out) throws Exception {
>
> if (testMapState.isEmpty()) {
>
> testMapState.putAll(value);
>
> out.collect(value);
>
> testMapState.clear();
> }
> }
>
> We faced very bad performance and then we made some tests using jprofiler.
> Using jprofiler, we saw that the hot spots are 2 functions of the MapState:
> 1. isEmpty() - around 7 ms
> 2. clear() - around 4 ms
>
> We had to change and use ValueState instead.
>
> Are we using the MapState in the correct way or are we doing something
> wrong ?
> Is this behaviour expected because flink  recommendations are to use
> MapState and NOT ValueState ?
>
> BR,
> Nick
>


Re: MapState bad performance

2020-06-16 Thread Yun Tang
Hi Nick

As you might know, RocksDB suffers not so good performance for iterator-like 
operations due to it needs to merge sort for multi levels. [1]

Unfortunately, rocksDBMapState.isEmpty() needs to call iterator and seek 
operations over rocksDB [2], and rocksDBMapState.clear() needs to iterator over 
state and remove entry [3].
However, even these operations behaves not so good, I don't think they would 
behave extremely bad in general case. From our experience on SSD, the latency 
of seek should be less than 100us
and could go up to hundreds of us, did you use SSD disk?

  1.  What is the Flink version, taskmanager memory, number of slots and 
RocksDB related configurations?
  2.  Have you checked the IOPS, disk util for those machines which containing 
task manager running RocksDB?

[1] https://github.com/facebook/rocksdb/wiki/Iterator-Implementation
[2] 
https://github.com/apache/flink/blob/efd497410ced3386b955a92b731a8e758223045f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java#L241
[3] 
https://github.com/apache/flink/blob/efd497410ced3386b955a92b731a8e758223045f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java#L254

Best
Yun Tang


From: nick toker 
Sent: Tuesday, June 16, 2020 15:35
To: user@flink.apache.org 
Subject: MapState bad performance

Hello,

We wrote a very simple streaming pipeline containing:
1. Kafka consumer
2. Process function
3. Kafka producer

The code of the process function is listed below:


private transient MapState testMapState;

@Override
public void processElement(Map value, Context ctx, 
Collector> out) throws Exception {

if (testMapState.isEmpty()) {

testMapState.putAll(value);

out.collect(value);

testMapState.clear();
}
}

We faced very bad performance and then we made some tests using jprofiler.
Using jprofiler, we saw that the hot spots are 2 functions of the MapState:
1. isEmpty() - around 7 ms
2. clear() - around 4 ms

We had to change and use ValueState instead.

Are we using the MapState in the correct way or are we doing something wrong ?
Is this behaviour expected because flink  recommendations are to use MapState 
and NOT ValueState ?

BR,
Nick


MapState bad performance

2020-06-16 Thread nick toker
Hello,

We wrote a very simple streaming pipeline containing:
1. Kafka consumer
2. Process function
3. Kafka producer

The code of the process function is listed below:

private transient MapState testMapState;

@Override
public void processElement(Map value, Context ctx,
Collector> out) throws Exception {

if (testMapState.isEmpty()) {

testMapState.putAll(value);

out.collect(value);

testMapState.clear();
}
}

We faced very bad performance and then we made some tests using jprofiler.
Using jprofiler, we saw that the hot spots are 2 functions of the MapState:
1. isEmpty() - around 7 ms
2. clear() - around 4 ms

We had to change and use ValueState instead.

Are we using the MapState in the correct way or are we doing something
wrong ?
Is this behaviour expected because flink  recommendations are to use
MapState and NOT ValueState ?

BR,
Nick