Hi Oleg,

this sounds indeed like abnormal behavior. Are you sure that these large checkpoints are related to the Kafka consumer only? Are there other operators in the pipeline? Because internally the state kept in a Kafka consumer is pretty minimal and only related to Kafka partition and offset management.

If you are sure that the Kafka consumer must produce such a state size, I would recommend to use a remote debugger and check what is checkpointed in the corresponding `FlinkKafkaConsumerBase#snapshotState`.

Regards,
Timo


On 15.04.20 03:37, Oleg Vysotsky wrote:
Hello,

Sometime our flink job starts creating large checkpoints which include 55 Gb (instead of 2 MB) related to kafka source. After the flink job creates first “abnormal” checkpoint all next checkpoints are “abnormal” as well. Flink job can’t be restored from such checkpoint. Restoring from the checkpoint hangs/fails. Also flnk dashboard hangs and flink cluster crashs during the restoring from such checkpoint.  We  didn’t catch related error message.  Also we don’t find clear way to reproduce this problem (when the flink job creates “abnormal” checkpoints).

Configuration:

We are using flink 1.8.1 on emr (emr 5.27)

Kafka: confluence kafka 5.4.1

Flink kafka connector:  org.apache.flink:flink-connector-kafka_2.11:1.8.1 (it includes org.apache.kafka:kafka-clients:2.0.1 dependencies)

Our input kafka topic has 32 partitions and related flink source has 32 parallelism

We use pretty much all default flink kafka concumer setting. We only specified:

CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,

ConsumerConfig.GROUP_ID_CONFIG,

CommonClientConfigs.SECURITY_PROTOCOL_CONFIG

Thanks a lot  in advance!

Oleg


Reply via email to