Hello,

Thank you for looking into this so quickly! Unfortunately, we didn't have
DEBUG logs enabled in observers nodes at the time as this was a production
environment.
However, I was able to reproduce the behaviour in a sandbox cluster with
the following setup (though the root cause may have differed from
production):

1. Spin up Kafka cluster with 3 or 5 controllers. (ideally 5 to increase
likelihood of bootstrapping to a follower instead of the leader)
2. Enable a network delay on a particular observer broker (e.g. `tc qdisc
add dev eth0 root netem delay 2500ms`). I picked 2500ms since default
timeout is 2s for `controller.quorum.fetch.timeout.ms`/`
controller.quorum.request.timeout.ms`. After a few seconds, disable the
network delay (e.g. `tc qdisc del dev eth0 root netem`).
3. The observer node will re-bootstrap, potentially to a follower instead
of the leader. If so, the observer will continuously send fetch requests to
the follower node, receive `NOT_LEADER_OR_FOLLOWER` in response, and will
no longer replicate metadata.

I have linked relevant DEBUG logs of the observer, leader, and follower for
the above scenario:
https://gist.github.com/justin-chen/1f3eee79d9a5066a467818a0b1bc006f

Let me know if you need any more information.
Thank you!
Justin C


On Thu, May 29, 2025 at 9:38 PM Alyssa Huang <ahu...@confluent.io> wrote:

> Wanted to correct my wording and resend the original text of my email
> because I got a bounce back -
>
>> Thanks Justin,
>>
>> It's hard to say from the current details if it's simply a network issue
>> (e.g. broker never receives the response with the leaderId), bug (broker
>> does receive response with leaderId, never transitions to follower), or
>> something else. Could you potentially send over logs from the misbehaving
>> observer and whichever follower it is fetching from? Additionally, the
>> quorum state store file on the observer. I have a theory of how this might
>> be possible, but the additional logs + quorum state store file would help
>> confirm.
>>
>> Best,
>> Alyssa
>>
>
> correction:
>
>> bug (broker does receive response with leaderId, continues to fetch from
>> bootstrap servers instead of from the leader)
>
>
>
> Also, I believe I may have reproduced the issue with a test, but will give
> another update tomorrow.
>
> Best,
> Alyssa
>
> On Thu, May 29, 2025 at 3:30 PM Alyssa Huang <ahu...@confluent.io> wrote:
>
>> Thanks Justin,
>>
>> It's hard to say from the current details if it's simply a network issue
>> (e.g. broker never receives the response with the leaderId), bug (broker
>> does receive response with leaderId, never transitions to follower), or
>> something else. Could you potentially send over logs from the misbehaving
>> observer and whichever follower it is fetching from? Additionally, the
>> quorum state store file on the observer. I have a theory of how this might
>> be possible, but the additional logs + quorum state store file would help
>> confirm.
>>
>> Best,
>> Alyssa
>>
>> On Thu, May 29, 2025 at 12:36 PM Justin Chen
>> <justin.c...@shopify.com.invalid> wrote:
>>
>>> To correct my original description:
>>>
>>> We have observed that KRaft observers (process.roles=broker) that
>>> typically
>>> send FETCH requests to the quorum Leader node can enter a state of
>>> indefinitely **sending FETCH requests to a voter (follower) node**, which
>>> we believe to be after a re-bootstrap due to some sort of request failure
>>> or timeout.
>>>
>>>
>>> On Thu, May 29, 2025 at 3:16 PM Justin Chen <justin.c...@shopify.com>
>>> wrote:
>>>
>>> > Hello,
>>> >
>>> > In our Kafka 4.0 cluster (dynamic quorum, 5 controller nodes), we have
>>> > observed that KRaft observers (process.roles=broker) that typically
>>> send
>>> > FETCH requests to the quorum Leader node can enter a state of
>>> indefinitely
>>> > re-bootstraping to a voter (follower) node, likely after some sort of
>>> > request failure or timeout. Subsequently, the observer node’s high
>>> water
>>> > mark/metadata offset would not update, causing issues such as out of
>>> sync
>>> > replicas during partition reassignments. We also observe a high rate of
>>> > NOT_LEADER_OR_FOLLOWER errors
>>> >
>>> (kafka.network:type=RequestMetrics,name=ErrorsPerSec,request=FETCH,error=NOT_LEADER_OR_FOLLOWER
>>> > ) by the voter node that is receiving the FETCH requests.
>>> >
>>> > The observer would only be able to recover after restarting the voter
>>> it
>>> > (re-)bootstrapped to, which causes another re-bootstrap to a random
>>> voter
>>> > node. If by chance the observer connects to the correct leader node,
>>> the
>>> > metadata replication would recover and errors would stop.
>>> >
>>> > With DEBUG logs enabled on the KRaft controllers, we repeatedly see the
>>> > following log on the voter node that is incorrectly receiving the for
>>> FETCH
>>> > requests:
>>> >
>>> > ```
>>> > Completed
>>> >
>>> request:{"isForwarded":false,"requestHeader":{"requestApiKey":1,"requestApiVersion":17,"correlationId":11131463,"clientId":"raft-client-81","requestApiKeyName":"FETCH"},"request":{"clusterId":"vD8YJMbtQMyOnzTfZ5RK4g","replicaState":{"replicaId":81,"replicaEpoch":-1},"maxWaitMs":500,"minBytes":0,"maxBytes":8388608,"isolationLevel":0,"sessionId":0,"sessionEpoch":-1,"topics":[{"topicId":"AAAAAAAAAAAAAAAAAAAAAQ","partitions":[{"partition":0,"currentLeaderEpoch":69,"fetchOffset":4793550,"lastFetchedEpoch":69,"logStartOffset":-1,"partitionMaxBytes":0,"replicaDirectoryId":"sJjyY5zzN1XLxEmDoWwVig"}]}],"forgottenTopicsData":[],"rackId":""},"response":{"throttleTimeMs":0,"errorCode":0,"sessionId":0,"responses":[{"topicId":"AAAAAAAAAAAAAAAAAAAAAQ","partitions":[{"partitionIndex":0,"errorCode":6,"highWatermark":-1,"lastStableOffset":-1,"logStartOffset":4782613,"currentLeader":{"leaderId":7002,"leaderEpoch":69},"abortedTransactions":[],"preferredReadReplica":-1,"recordsSizeInBytes":0}]}],"nodeEndpoints":[{"nodeId":7002,"host":"kraftcontroller-7002.kafka.<redacted>.com.","port":9095,"rack":null}]},"connection":"<redacted_serverIp>:<serverPort>-<redacted_clientIp>:<clientPort>","totalTimeMs":0.458,"requestQueueTimeMs":0.113,"localTimeMs":0.086,"remoteTimeMs":0.203,"throttleTimeMs":0,"responseQueueTimeMs":0.021,"sendTimeMs":0.032,"securityProtocol":"PLAINTEXT","principal":"User:ANONYMOUS","listener":"KRAFT_CONTROLLER","clientInformation":{"softwareName":"apache-kafka-java","softwareVersion":"4.0.0"}}
>>> > ```
>>> >
>>> > Note that the top level error code is 0 (success), however the
>>> > `response.partitions[0].errorCode` is 6 (NOT_LEADER_OR_FOLLOWER).
>>> Tracing
>>> > through the FETCH logic of the KafkaRaftClient, it seems the response
>>> is
>>> > handled “successfully” by the `maybeHandleCommonResponse` (
>>> >
>>> https://github.com/apache/kafka/blob/4.0.0/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L1707-L1717
>>> )
>>> > method, yet the correct leader in the response (node 7002) is not used
>>> for
>>> > subsequent requests. The Raft client would continue sending to the
>>> > incorrect voter node and never re-bootstrap/backoff (
>>> >
>>> https://github.com/apache/kafka/blob/4.0.0/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L3224-L3225
>>> )
>>> > until a voter node restart.
>>> >
>>> > We configure each node’s `controller.quorum.bootstrap.servers` to a
>>> single
>>> > host that load balances to 1 of 5 KRaft controllers, but I do not
>>> believe
>>> > that explicitly listing all 5 host:port strings would prevent this
>>> issue.
>>> >
>>> > I wanted to confirm if this is indeed a bug within KRaft, or a
>>> potential
>>> > misconfiguration on our end.
>>> >
>>> > Thank you!
>>> > Justin C
>>> >
>>> >
>>>
>>> --
>>> Regards,
>>> Justin Chen
>>>
>>

Reply via email to