AlexeyASF created KAFKA-16319: --------------------------------- Summary: Wrong broker destinations for DeleteRecords requests when more than one topic is involved and the topics/partitions are led by different brokers Key: KAFKA-16319 URL: https://issues.apache.org/jira/browse/KAFKA-16319 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.6.1 Reporter: AlexeyASF
h2. Context Kafka streams applications send, time after time, {{DeleteRecords}} requests, via {{org.apache.kafka.streams.processor.internals.TaskManager#maybePurgeCommittedRecords}} method. Such requests may involve more than 1 topic (or partition), and such requests are supposed to be sent to partitions' leaders brokers. h2. Observed behaviour In case when {{DeleteRecords}} request includes more than one topic (let's say 2 - {{topic1}} and {{{}topic2{}}}), and these topics are led by different brokers (let’s say {{broker1}} and {{broker2}} respectively), the request is sent to only one broker (let’s say {{{}broker1{}}}), leading to partial not_leader_or_follower errors. As not the whole request was successful ({{{}topic1{}}} is fine, but {{topic2}} is not), it gets retried, with the _same_ arguments, to the _same_ broker ({{{}broker1{}}}), meaning the response will be partially faulty again and again. It also may (and does) happen that there is a “mirrored” half-faulty request - in this case, to {{{}broker2{}}}, where {{topic2}} operation is successful, but {{topic1}} operation fails. Here’s an anonymised logs example from a production system (“direct” and “mirrored” requests, one after another): {code:java} [AdminClient clientId=worker-admin] Sending DeleteRecordsRequestData(topics=[ DeleteRecordsTopic( name='topic1', partitions=[DeleteRecordsPartition(partitionIndex=5, offset=88017574)] ), DeleteRecordsTopic( name='topic2', partitions=[DeleteRecordsPartition(partitionIndex=5, offset=243841)] )], timeoutMs=60000) to broker1:PORT (id: 2 rack: RACK1). // <-- Note the broker, it's broker1 correlationId=42003907, timeoutMs=30000 [AdminClient clientId=worker-admin] Sending DeleteRecordsRequestData(topics=[ DeleteRecordsTopic( name='topic1', partitions=[DeleteRecordsPartition(partitionIndex=5, offset=88017574)] ), DeleteRecordsTopic( name='topic2', partitions=[DeleteRecordsPartition(partitionIndex=5, offset=243841)] )], timeoutMs=60000) to broker2:9098 (id: 4 rack: RACK2). // <-- Note the broker, here it's broker2 correlationId=42003906, timeoutMs=30000 {code} Such request results in the following response (in this case, only for the "direct" response): {code:java} [AdminClient clientId=worker-admin] Call( callName=deleteRecords(api=DELETE_RECORDS), deadlineMs=..., tries=..., // Can be hundreds nextAllowedTryMs=...) got response DeleteRecordsResponseData( throttleTimeMs=0, topics=[ DeleteRecordsTopicResult( name='topic2', partitions=[DeleteRecordsPartitionResult( partitionIndex=5, lowWatermark=-1, errorCode=6)]), // <-- Note the errorCode 6, which is not_leader_or_follower DeleteRecordsTopicResult( name='topic1', partitions=[DeleteRecordsPartitionResult( partitionIndex=5, lowWatermark=..., errorCode=0)]) // <-- Note the errorCode 0, which means the operation was successful ] ) {code} h2. Expected behaviour {{DeleteRecords}} requests are sent to corresponding partitions' leaders brokers when more than 1 topic/partition is involved and they are led by different brokers. h2. Notes * {_}presumably{_}, introduced in 3.6.1 via [https://github.com/apache/kafka/pull/13760] . -- This message was sent by Atlassian Jira (v8.20.10#820010)