Hello,
I am using kafka server 3.4.0 along with flink. Kafka server and Flink are
installed on a 48 core , 252GB box. My use case is as follows -
8 Kafka producers writing events at 200K per second to kafka topic "Event"
with 20 partitions, source for flink --> Flink processing rules that read from
Event topic and write to Alert topic --> kafka topic "Alert" with 20
partitions, sink for flink.
It was all good until we started seeing that flink kafka consumer for Event
topic getting timed out frequently as kafka responds quite late to the fetch
requests. I am not able to figure out the reason why kafka takes a lot of time
to process this FETCH request randomly. Is there a configuration that I must
look at or nay other log that I must check to figure out whats going on? When
everything is fine, kakfa takes only a few milliseconds to process the fetch
requests.
Timeout of kafka consumer at the flink side is 30 secs and the consumer thread
blocks until it gets a response or timesout.
Here are some kafka trace logs -
server.log
[2023-09-13 17:37:05,463] TRACE [Kafka Request Handler 1 on Broker 1], Kafka
request handler 1 on broker 1 handling request Request(processor=0,
connectionId=127.0.0.1:9092-127.0.0.1:39268-2028,
session=Session(User:ANONYMOUS,/127.0.0.1),
listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, buffer=null,
envelope=None) (kafka.server.KafkaRequestHandler)
[2023-09-13 17:37:05,463] TRACE Sending FETCH response to client Event-8 of
1048654 bytes. (kafka.network.RequestChannel)
[2023-09-13 17:37:05,463] TRACE Socket server received response to send to
127.0.0.1:9092-127.0.0.1:39268-2028, registering for write and sending data:
Response(type=Send, request=Request(processor=0,
connectionId=127.0.0.1:9092-127.0.0.1:39268-2028,
session=Session(User:ANONYMOUS,/127.0.0.1),
listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, buffer=null,
envelope=None), send=MultiRecordsSend(size=1048654, totalWritten=0),
asString=Some({"throttleTimeMs":0,"errorCode":0,"sessionId":1670662638,"responses":[{"topicId":"MtsZtV-bRoCAwPiHqRIzvA","partitions":[{"partitionIndex":14,"errorCode":0,"highWatermark":37934921072,"lastStableOffset":37934921072,"logStartOffset":31032998864,"abortedTransactions":null,"preferredReadReplica":-1,"recordsSizeInBytes":1048576}]}]}))
(kafka.network.Processor)
request-logs
[2023-09-13 17:37:05,463] TRACE Processor 0 received request:
RequestHeader(apiKey=FETCH, apiVersion=13, clientId=Event-8,
correlationId=39532, headerVersion=2) -- FetchRequestData(clusterId=null,
replicaId=-1, maxWaitMs=500, minBytes=1, maxBytes=52428800, isolationLevel=0,
sessionId=1670662638, sessionEpoch=10, topics=[FetchTopic(topic='',
topicId=MtsZtV-bRoCAwPiHqRIzvA, partitions=[FetchPartition(partition=14,
currentLeaderEpoch=66, fetchOffset=37656832058, lastFetchedEpoch=-1,
logStartOffset=-1, partitionMaxBytes=1048576)])], forgottenTopicsData=[],
rackId='') (kafka.network.RequestChannel$)
[2023-09-13 17:37:05,463] TRACE [KafkaApi-1] Handling
request:RequestHeader(apiKey=FETCH, apiVersion=13, clientId=Event-8,
correlationId=39532, headerVersion=2) -- FetchRequestData(clusterId=null,
replicaId=-1, maxWaitMs=500, minBytes=1, maxBytes=52428800, isolationLevel=0,
sessionId=1670662638, sessionEpoch=10, topics=[FetchTopic(topic='',
topicId=MtsZtV-bRoCAwPiHqRIzvA, partitions=[FetchPartition(partition=14,
currentLeaderEpoch=66, fetchOffset=37656832058, lastFetchedEpoch=-1,
logStartOffset=-1, partitionMaxBytes=1048576)])], forgottenTopicsData=[],
rackId='') from connection
127.0.0.1:9092-127.0.0.1:39268-2028;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS
(kafka.server.KafkaApis)
[2023-09-13 17:37:05,463] TRACE [KafkaApi-1] Sending Fetch response with
partitions.size=1, metadata=1670662638 (kafka.server.KafkaApis)
[2023-09-13 17:39:30,223] DEBUG Completed
request:{"isForwarded":false,"requestHeader":{"requestApiKey":1,"requestApiVersion":13,"correlationId":39532,"clientId":"Event-8","requestApiKeyName":"FETCH"},"request":{"replicaId":-1,"maxWaitMs":500,"minBytes":1,"maxBytes":52428800,"isolationLevel":0,"sessionId":1670662638,"sessionEpoch":10,"topics":[{"topicId":"MtsZtV-bRoCAwPiHqRIzvA","partitions":[{"partition":14,"currentLeaderEpoch":66,"fetchOffset":37656832058,"lastFetchedEpoch":-1,"logStartOffset":-1,"partitionMaxBytes":1048576}]}],"forgottenTopicsData":[],"rackId":""},"response":{"throttleTimeMs":0,"errorCode":0,"sessionId":1670662638,"responses":[{"topicId":"MtsZtV-bRoCAwPiHqRIzvA","partitions":[{"partitionIndex":14,"errorCode":0,"highWatermark":37934921072,"lastStableOffset":37934921072,"logStartOffset":31032998864,"abortedTransactions":null,"preferredReadReplica":-1,"recordsSizeInBytes":1048576}]}]},"connection":"127.0.0.1:9092-127.0.0.1:39268-2028","totalTimeMs":144760.386,"requestQueueTimeMs":0.192,"localTimeMs":0.476,"remoteTimeMs":0.0,"throttleTimeMs":0,"responseQueueTimeMs":0.035,"sendTimeMs":144759.682,"securityProtocol":"PLAINTEXT","principal":"User:ANONYMOUS","listener":"PLAINTEXT","clientInformation":{"softwareName":"apache-kafka-java","softwareVersion":"3.4.0"}}
(kafka.request.logger)
Thanks,
Neha
Caution: External email. Do not click or open attachments unless you know and
trust the sender.