[ 
https://issues.apache.org/jira/browse/KAFKA-16927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio resolved KAFKA-16927.
------------------------------------------------
    Resolution: Fixed

> Handle expanding leader endpoints
> ---------------------------------
>
>                 Key: KAFKA-16927
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16927
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: kraft
>            Reporter: Alyssa Huang
>            Assignee: José Armando García Sancio
>            Priority: Blocker
>             Fix For: 3.9.0
>
>
> Restarting an inactive controller fails to start if the active leader has 
> more endpoint than the latest voter set. The easiest to reproduce is with the 
> following configuration
> {code:java}
> cat kafka.properties | grep controller.quorum
> controller.quorum.voters=0@controller-0:1234,1@controller-1:1234,2@controller-2:1234{code}
> This is what is executed in the QuorumState loading code:
> {code:java}
>               if (leaderEndpoints.isEmpty()) {
>                   ...
>               } else {
>                   initialState = new FollowerState(
>                       time,
>                       election.epoch(),
>                       election.leaderId(),
>                       leaderEndpoints,
>                       voters.voterIds(),
>                       Optional.empty(),
>                       fetchTimeoutMs,
>                       logContext
>                   );
>               }{code}
> If the leader has two endpoints it will send the following BEGIN_QUORUM_EPOCH 
> request:
> {code:java}
>  
> "leaderEndpoints":[{"name":"CONTROLLER_PLAINTEXT","host":"controller-0","port":1234},{"name":"CONTROLLER","host":"controller-0","port":4321}]{code}
> And this code doesn't handle that correctly:
> {code:java}
>           } else if (
>                   leaderId.isPresent() &&
>                   (!quorum.hasLeader() || leaderEndpoints.size() > 
> quorum.leaderEndpoints().size())
>           ) {
>               // The request or response indicates the leader of the current 
> epoch
>               // which are currently unknown or the replica has discovered 
> more endpoints
>               transitionToFollower(epoch, leaderId.getAsInt(), 
> leaderEndpoints, currentTimeMs);
>           }{code}
> After adding a test for this, the test fails with the following:
> {code:java}
> Gradle Test Run :raft:test > Gradle Test Executor 44 > KafkaRaftClientTest > 
> testHandleBeginQuorumRequestMoreEndpoints() FAILED
>     java.lang.AssertionError: Assertion failed with an exception
>         at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:453)
>         at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:393)
>         at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:377)
>         at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:367)
>         at 
> org.apache.kafka.raft.RaftClientTestContext.pollUntil(RaftClientTestContext.java:617)
>         at 
> org.apache.kafka.raft.RaftClientTestContext.pollUntilResponse(RaftClientTestContext.java:624)
>         at 
> org.apache.kafka.raft.KafkaRaftClientTest.testHandleBeginQuorumRequestMoreEndpoints(KafkaRaftClientTest.java:993)
>         Caused by:
>         java.lang.IllegalStateException: Cannot transition to Follower with 
> leader 829 and epoch 3 from state FollowerState(fetchTimeoutMs=50000, 
> epoch=3, leader=829, 
> leaderEndpoints=Endpoints(endpoints={ListenerName(LISTENER)=localhost/<unresolved>:10819}),
>  voters=[828, 829], highWatermark=Optional.empty, 
> fetchingSnapshot=Optional.empty)
>             at 
> org.apache.kafka.raft.QuorumState.transitionToFollower(QuorumState.java:480)
>             at 
> org.apache.kafka.raft.KafkaRaftClient.transitionToFollower(KafkaRaftClient.java:732)
>             at 
> org.apache.kafka.raft.KafkaRaftClient.maybeTransition(KafkaRaftClient.java:2434)
>             at 
> org.apache.kafka.raft.KafkaRaftClient.handleBeginQuorumEpochRequest(KafkaRaftClient.java:1018)
>             at 
> org.apache.kafka.raft.KafkaRaftClient.handleRequest(KafkaRaftClient.java:2565)
>             at 
> org.apache.kafka.raft.KafkaRaftClient.handleInboundMessage(KafkaRaftClient.java:2613)
>             at 
> org.apache.kafka.raft.KafkaRaftClient.poll(KafkaRaftClient.java:3314)
>             at 
> org.apache.kafka.raft.RaftClientTestContext.lambda$pollUntil$1(RaftClientTestContext.java:618)
>             at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:396)
>             at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:444)
>             ... 6 more
> 1 test completed, 1 failed {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to