Hi Vidya,

  1.  You could tune your job to avoid backpressure. Maybe you can upgrade your 
flink engine to at least flink-1.13 to know how to monitor the back pressure 
status [1]
  2.  You can refer to [2] to know how to custom your serializer.


[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/ops/monitoring/back_pressure/
[2] 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/custom_serialization/

Best,
Yun Tang
________________________________
From: Vidya Sagar Mula <mulasa...@gmail.com>
Sent: Sunday, March 6, 2022 4:16
To: Yun Tang <myas...@live.com>
Cc: user <user@flink.apache.org>
Subject: Re: Incremental checkpointing & RocksDB Serialization

Hi Yun Tang,
Thank you for the reply. I have follow up questions and need some more details. 
Can you please clarify my inline questions?

> Why is the incremental checkpointing taking more time for the snapshot at the 
> end of the window duration?

I guess that this is because the job is under back pressure on end of window. 
You can expand the checkpoint details to see whether that the async duration of 
each task is much slower than the e2e duration? If so, this caused the 
checkpoint barrier stay in the channel longer.

<VIDYA> - Yes, I expanded the checkpoint details and noticed e2e duration is 
much higher than async duration. Attaching the screenshot here(Checkpoint #59) 
Can you give elaborate more on "checkpoint barrier stay in the channel longer." 
What are the suggested ways to mitigate this issue? I am wondering how can this 
be avoided as it is happening only at the end of the window.


> Do you suggest any change in the serializer type in the RocksDB? (Kryo vs 
> Avro)

>From our experience,  kryo is not a good choice in most cases.

<VIDYA> - What are your recommendations on other serializers? I tried to change 
it to Avro by enabling the flag "forceAvro" to TRUE in the Execution Config. 
But, it RocksDB is still going picking KryoSerializer. This is because the 
Transformation is KeyType is assigned as GenericType. I am not sure what 
changes need to made to my class/pojo to take the Avro Serialzer.
Can you please suggest the way to change to other better serializers?



On Fri, Mar 4, 2022 at 2:06 AM Yun Tang 
<myas...@live.com<mailto:myas...@live.com>> wrote:
Hi Vidya,

> Why is the incremental checkpointing taking more time for the snapshot at the 
> end of the window duration?

I guess that this is because the job is under back pressure on end of window. 
You can expand the checkpoint details to see whether that the async duration of 
each task is much slower than the e2e duration? If so, this caused the 
checkpoint barrier stay in the channel longer.

> Why is RocksDB serialization causing the CPU peak?

This is caused by the implementation of your serializer.

> Do you suggest any change in the serializer type in the RocksDB? (Kryo vs 
> Avro)

>From our experience,  kryo is not a good choice in most cases.

Best
Yun Tang
________________________________
From: Vidya Sagar Mula <mulasa...@gmail.com<mailto:mulasa...@gmail.com>>
Sent: Friday, March 4, 2022 17:00
To: user <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Incremental checkpointing & RocksDB Serialization

Hi,

I have a cluster that contains the Flink 1.11 version with AWS - S3 backend. I 
am trying the incremental checkpointing on this set up. I have a pipeline with 
a 10 mins window and incremental checkpointing happens every 2 mins.

Observation:
-------------
I am observing the long duration while taking the snapshot at the end of each 
window, which means every last checkpoint of the window (almost all the times).
I am attaching the Flink UI, checkpoint history.

My set up details:
-------------------
Cluster: Cloud cluster with instance storage.
Memory : 20 GB,
Heap : 10 GB
Flink Managed Memory: 4.5 GB
Flink Version : 1.11
CPUs : 2

ROCKSDB_WRITE_BUFFER_SIZE: "2097152000"  ## 2GB

ROCKSDB_BLOCK_CACHE_SIZE: "104857600"    ## 100 Mb

ROCKSDB_BLOCK_SIZE: "5242880"  ## 5 Mb

ROCKSDB_CHECKPOINT_TRANSFER_THREAD_NUM: 4

ROCKSDB_MAX_BACKGROUND_THREADS: 4


In the analysis, I noticed that the CPU utilization is peaking to almost 100% 
at the time of issue. With further analysis with thread dumps at the time CPU 
peak, it is showing RocksDB serialization related call trace. All the thread 
samples are pointing to this stack.

Based on pipeline transformation class type, RocksDB is choosing Kryo 
Serializer. I did try to change the serializer type, but that is not the focal 
point I want to stress here.

I would like to understand the reason for high CPU utilization. I have tried to 
increase the CPU cycles to 2 and 4. But, it did not give me any better results. 
I have parallelism 2.

Please take a look at the below stack trace. Please suggest me why it is taking 
a lot of CPU at the time of serialize/deserialize in the RocksDB?

########

Stack-1, Stack-2, Stack-3 are attached to this email.

Questions:
-----------
- Why is the incremental checkpointing taking more time for the snapshot at the 
end of the window duration?
- Why is RocksDB serialization causing the CPU peak?
- Do you suggest any change in the serializer type in the RocksDB? (Kryo vs 
Avro)

Thank you,









Reply via email to