Dear Flink Community,
When using exactly once checkpointing
along kafka and connecting Kafka using SASL_SSL the job gets stuck with status
of Kafka producer in Initialising state for around 10 mins and post that it
functions normally across multiple checkpoints. Following are various config
and observations.
Flink/Job config:
VERSION: Flink 1.13.1 for both env and jars.
1. Single node with checkpointing in exactly once mode and checkpointing
interval as 2 minutes. ( tried with 10 minutes still same issue)
2. The job has source as flink kafka consumer for topic 1 and sink as flink
kafka producer for topic 2 and parallelism as 4 for source & sink.
3. The kafka producer uses FlinkKafkaProducer implementation in EXACTLY_ONCE
mode and connects to broker using SASL_SSL where SSL certificate is self signed.
Kafka config:
VERSION: 2.8 for broker. For Flink Job tried 2.4.1 (shipped with flink kafka
connector) and 2.8.0
1. Single broker with both topics having 4 partitions.
2. Everything works correctly using Kafka Consumer commands when using SASL_SSL
mode.
Observation for debug logs:
Background: In dev env with PLAIN_TEXT connection everything was working
perfectly. When I changed PLAIN_TEXT to SASL_SSL I was get timeout exceptions.
Which was resolved after I increased max.block.ms from 60000 to 90000 but then
started facing delayed start issue.
1. About error/exception:
I did not find any exception in debug logs but the notified following
pattern which keeps repeating until successful initialisation
SSL_HANDSHAKE --> SCRAM_AUTHENTICATION —> FIND_COORDINATOR —> FETCH
PRODUCER_ID —> Transition from state INITIALIZING to READY —> CLOSE PRODUCER
The producer keep repeating above step until following pattern occurs
SSL_HANDSHAKE --> SCRAM_AUTHENTICATION —> FIND_COORDINATOR —> FETCH
PRODUCER_ID —> Transition from state INITIALIZING to READY —> READY to
IN_TRANSACTION.
Also while kafka producer/sink is stuck in Initialising state if I cancel job
the kafka sink operator gets stuck and I get following error Task did not exit
gracefully within 180 + seconds. And task manager crashes.
2. Turning off SSL and using SASL_PLAINTEXT - When connecting to kafka broker
using SASL_PLAINTEXT mode the kafka producer gets initialised and starts
processing data in 50-60 secs.
3. Disable checkpointing - When checkpointing is disabled and use
SASL_SSL/SASL_PLAINTEXT mode fro kafka. The kafka producer get initialised
immediately but Kafka Consumer or sink takes 10-15 secs to initialise and then
app operators start processing data.
4. If wrote a producer directly using kafka producer from kafka library. And
added transaction to it. My obeservation was kafkaProducer.initTransactions()
method takes around 30 sec to execute post which it starting publishing data to
kafka topic.
5. Same pattern of delayed start seems to appear incase of crash and automatic
restart of job.
6. No exception in Flink Dashboard UI for job.
PFA, a copy of part of log.