Dear Flink Community,
                                        When using exactly once checkpointing 
along kafka and connecting Kafka using SASL_SSL the job gets stuck with status 
of Kafka producer in Initialising state for around 10 mins and post that it 
functions normally across multiple checkpoints. Following are various config 
and observations.

Flink/Job config:  
VERSION: Flink 1.13.1 for both env and jars.

1. Single node with checkpointing in exactly once mode and checkpointing 
interval as 2 minutes. ( tried with 10 minutes still same issue)
2. The job has source as flink kafka consumer for topic 1 and sink as flink 
kafka producer for topic 2 and parallelism as 4 for source & sink.
3. The kafka producer uses FlinkKafkaProducer implementation in EXACTLY_ONCE 
mode and connects to broker using SASL_SSL where SSL certificate is self signed.


Kafka config:
VERSION: 2.8 for broker. For Flink Job tried 2.4.1 (shipped with flink kafka 
connector) and 2.8.0

1. Single broker with both topics having 4 partitions.
2. Everything works correctly using Kafka Consumer commands when using SASL_SSL 
mode.

Observation for debug logs:
Background: In dev env with PLAIN_TEXT connection everything was working 
perfectly. When I changed PLAIN_TEXT to SASL_SSL I was get timeout exceptions. 
Which was resolved after I increased max.block.ms from 60000 to 90000 but then 
started facing delayed start issue.

1. About error/exception:
        I did not find any exception in debug logs but the notified following 
pattern which keeps repeating until successful initialisation
        SSL_HANDSHAKE --> SCRAM_AUTHENTICATION —> FIND_COORDINATOR —> FETCH 
PRODUCER_ID —> Transition from state INITIALIZING to READY —> CLOSE PRODUCER
        
The producer keep repeating above step until following pattern occurs 
        SSL_HANDSHAKE --> SCRAM_AUTHENTICATION —> FIND_COORDINATOR —> FETCH 
PRODUCER_ID —> Transition from state INITIALIZING to READY —> READY to 
IN_TRANSACTION.

Also while kafka producer/sink is stuck in Initialising state if I cancel job 
the kafka sink operator gets stuck and I get following error  Task did not exit 
gracefully within 180 + seconds. And task manager crashes.

2. Turning off SSL and using SASL_PLAINTEXT - When connecting to kafka broker 
using SASL_PLAINTEXT mode the kafka producer gets initialised and starts 
processing data in 50-60 secs.

3. Disable checkpointing - When checkpointing is disabled and use 
SASL_SSL/SASL_PLAINTEXT mode fro kafka. The kafka producer get initialised 
immediately but Kafka Consumer or sink takes 10-15 secs to initialise and then 
app operators start processing data.

4. If wrote a producer directly using kafka producer from kafka library. And 
added transaction to it. My obeservation was kafkaProducer.initTransactions() 
method takes around 30 sec to execute post which it starting publishing data to 
kafka topic.

5. Same pattern of delayed start seems to appear incase of crash and automatic 
restart of job.

6. No exception in Flink Dashboard UI for job.

PFA, a copy of part of log.

Reply via email to