This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 88a23dab3ea KAFKA-18844: Stale features information in
QuorumController#registerBroker (#18997)
88a23dab3ea is described below
commit 88a23dab3ea6f76cc32066066149ce7843bf24ab
Author: PoAn Yang <[email protected]>
AuthorDate: Fri Feb 28 01:30:51 2025 +0800
KAFKA-18844: Stale features information in QuorumController#registerBroker
(#18997)
In https://github.com/apache/kafka/pull/16848, we added `kraft.version`
to finalized features and got finalized features outside controller
event handling thread. This may make finalized features stale when
processing `registerBroker` event. Also, some cases like
`QuorumControllerTest.testBalancePartitionLeaders` become flaky cause of
outdated MV. This PR moves finalized features back to controller event
handling thread to avoid the error.
Reviewers: Ismael Juma <[email protected]>, Jun Rao <[email protected]>,
Colin P. McCabe <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../org/apache/kafka/controller/QuorumController.java | 17 ++++++++++-------
1 file changed, 10 insertions(+), 7 deletions(-)
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index bad7c8fbb84..fc5f99358f2 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -2004,14 +2004,17 @@ public final class QuorumController implements
Controller {
ControllerRequestContext context,
BrokerRegistrationRequestData request
) {
- // populate finalized features map with latest known kraft version for
validation
- Map<String, Short> controllerFeatures = new
HashMap<>(featureControl.finalizedFeatures(Long.MAX_VALUE).featureMap());
- controllerFeatures.put(KRaftVersion.FEATURE_NAME,
raftClient.kraftVersion().featureLevel());
return appendWriteEvent("registerBroker", context.deadlineNs(),
- () -> clusterControl.
- registerBroker(request, offsetControl.nextWriteOffset(),
- new FinalizedControllerFeatures(controllerFeatures,
Long.MAX_VALUE),
- context.requestHeader().requestApiVersion() >= 3),
+ () -> {
+ // Read and write data in the controller event handling thread
to avoid stale information.
+ Map<String, Short> controllerFeatures = new
HashMap<>(featureControl.finalizedFeatures(Long.MAX_VALUE).featureMap());
+ // Populate finalized features map with latest known kraft
version for validation.
+ controllerFeatures.put(KRaftVersion.FEATURE_NAME,
raftClient.kraftVersion().featureLevel());
+ return clusterControl.
+ registerBroker(request, offsetControl.nextWriteOffset(),
+ new FinalizedControllerFeatures(controllerFeatures,
Long.MAX_VALUE),
+ context.requestHeader().requestApiVersion() >= 3);
+ },
EnumSet.noneOf(ControllerOperationFlag.class));
}