Hi,
   We started a Beam application with Flink runner with parallelism as 50. It 
is a stateful application which uses RocksDB as state store. We are using 
timers and Beam’s value state and bag state (which is same as List state of 
Flink). We are doing incremental checkpointing. With initial parallelism of 50, 
our application is able to process up to 50,000 records per second. After a 
week, we took a savepoint and restarted from savepoint with the parallelism of 
18. We are seeing that our application is only able to process 7000 records per 
second. Records processed per task manager was almost half of what is used to 
process previously with 50 task managers.

We didn’t give any maxParallelism in our Beam application but found from logs 
that maxParallelism has been set to -1. Also Beam’s doc for Flink runner 
mentiones by default maxParallelism is -1 
https://beam.apache.org/documentation/runners/flink/

But this Flink doc 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html 
mentions that by default maxParallelism is set to operatorParallelism + 
(operatorParallelism / 2) which would be 75 in our case.

I didn’t get how maxParallelism is set (when giving maxParallelism as -1 to 
Beam’s Flink runner). I highly doubt more key groups is causing this 
performance degradation?

Beam version - 2.19
Flink version- 1.9

Any suggestions/help would be appreciated.


Thanks
Sandeep Kathula


Reply via email to