Hi, We have a Flink 1.11.1 Version streaming pipeline in production which reads from Kafka. Kafka Server version is 2.5.0 - confluent 5.5.0 Kafka Client Version is 2.4.1 - {"component":"org.apache.kafka.common.utils.AppInfoParser$AppInfo","message":"Kafka version: 2.4.1","method":"<init>"}
Occasionally(every 6 to 12 hours), we have observed that the Kafka consumption rate went down(NOT 0) and the following logs were observed: Generally, the consumption rate across all consumers is 4k records/sec. When this issue occurred, the consumption rate dropped to < 50 records/sec org.apache.kafka.common.errors.DisconnectException: null {"time":"2021-07-07T22:13:37,385","severity":"INFO","component":"org.apache.kafka.clients.FetchSessionHandler","message":"[Consumer clientId=consumer-MFTDataProcessorEventSignatureConsumerGroupV1R1-3, groupId=MFTDataProcessorEventSignatureConsumerGroupV1R1] Error sending fetch request (sessionId=405798138, epoch=5808) to node 8: {}.","method":"handleError"} org.apache.kafka.common.errors.TimeoutException: Failed {"time":"2021-07-07T22:26:41,379","severity":"INFO","component":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","message":"[Consumer clientId=consumer-MFTDataProcessorEventSignatureConsumerGroupV1R1-3, groupId=MFTDataProcessorEventSignatureConsumerGroupV1R1] Group coordinator 100.98.40.16:9092 (id: 2147483623 rack: null) is unavailable or invalid, will attempt rediscovery","method":"markCoordinatorUnknown"} {"time":"2021-07-07T22:27:10,465","severity":"INFO","component":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler","message":"[Consumer clientId=consumer-MFTDataProcessorEventSignatureConsumerGroupV1R1-3, groupId=MFTDataProcessorEventSignatureConsumerGroupV1R1] Discovered group coordinator 100.98.40.16:9092 (id: 2147483623 rack: null)","method":"onSuccess"} The consumers retried for more than an hour but the above logs are observed again. The consumers started pulling data after a manual restart. No WARN or ERROR logs were observed in Kafka or Zookeeper during this period. Our observation from this incident is that Kafka Consumer retries could not resolve the issue but a manual restart (or) Flink internal restart(Failure rate restart policy) does. Has anyone faced this issue before? Any pointers are appreciated. Regards, Rahul