AlexeyASF created KAFKA-16319:
---------------------------------

             Summary: Wrong broker destinations for DeleteRecords requests when 
more than one topic is involved and the topics/partitions are led by different 
brokers
                 Key: KAFKA-16319
                 URL: https://issues.apache.org/jira/browse/KAFKA-16319
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 3.6.1
            Reporter: AlexeyASF


h2. Context

Kafka streams applications send, time after time, {{DeleteRecords}} requests, 
via 
{{org.apache.kafka.streams.processor.internals.TaskManager#maybePurgeCommittedRecords}}
 method. Such requests may involve more than 1 topic (or partition), and such 
requests are supposed to be sent to partitions' leaders brokers.
 
h2. Observed behaviour

In case when {{DeleteRecords}} request includes more than one topic (let's say 
2 - {{topic1}} and {{{}topic2{}}}), and these topics are led by different 
brokers (let’s say {{broker1}} and {{broker2}} respectively), the request is 
sent to only one broker (let’s say {{{}broker1{}}}), leading to partial 
not_leader_or_follower errors. As not the whole request was successful 
({{{}topic1{}}} is fine, but {{topic2}} is not), it gets retried, with the 
_same_ arguments, to the _same_ broker ({{{}broker1{}}}), meaning the response 
will be partially faulty again and again. It also may (and does) happen that 
there is a “mirrored” half-faulty request - in this case, to {{{}broker2{}}}, 
where {{topic2}} operation is successful, but {{topic1}} operation fails.

Here’s an anonymised logs example from a production system (“direct” and 
“mirrored” requests, one after another):
{code:java}
[AdminClient clientId=worker-admin]
Sending DeleteRecordsRequestData(topics=[
  DeleteRecordsTopic(
    name='topic1',
    partitions=[DeleteRecordsPartition(partitionIndex=5, offset=88017574)]
    ),
  DeleteRecordsTopic(
    name='topic2',
    partitions=[DeleteRecordsPartition(partitionIndex=5, offset=243841)]
)], timeoutMs=60000)
to broker1:PORT (id: 2 rack: RACK1). // <-- Note the broker, it's broker1
correlationId=42003907, timeoutMs=30000

[AdminClient clientId=worker-admin]
Sending DeleteRecordsRequestData(topics=[
  DeleteRecordsTopic(
    name='topic1',
    partitions=[DeleteRecordsPartition(partitionIndex=5, offset=88017574)]
  ),
  DeleteRecordsTopic(
    name='topic2',
    partitions=[DeleteRecordsPartition(partitionIndex=5, offset=243841)]
)], timeoutMs=60000)
to broker2:9098 (id: 4 rack: RACK2). // <-- Note the broker, here it's broker2
correlationId=42003906, timeoutMs=30000 {code}
Such request results in the following response (in this case, only for the 
"direct" response):
{code:java}
[AdminClient clientId=worker-admin]
Call(
  callName=deleteRecords(api=DELETE_RECORDS),
  deadlineMs=...,
  tries=..., // Can be hundreds
  nextAllowedTryMs=...)
got response DeleteRecordsResponseData(
  throttleTimeMs=0,
  topics=[
    DeleteRecordsTopicResult(
      name='topic2',
      partitions=[DeleteRecordsPartitionResult(
        partitionIndex=5, lowWatermark=-1, errorCode=6)]), // <-- Note the 
errorCode 6, which is not_leader_or_follower
    DeleteRecordsTopicResult(
      name='topic1',
      partitions=[DeleteRecordsPartitionResult(
        partitionIndex=5, lowWatermark=..., errorCode=0)]) // <-- Note the 
errorCode 0, which means the operation was successful
  ]
) {code}
h2. Expected behaviour

{{DeleteRecords}} requests are sent to corresponding partitions' leaders 
brokers when more than 1 topic/partition is involved and they are led by 
different brokers.
h2. Notes
 * {_}presumably{_}, introduced in 3.6.1 via 
[https://github.com/apache/kafka/pull/13760] .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to