cmccabe commented on code in PR #17502:
URL: https://github.com/apache/kafka/pull/17502#discussion_r1813273750
##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -1597,18 +1597,34 @@ public ControllerResult<Void> unregisterBroker(int
brokerId) {
return ControllerResult.of(records, null);
}
- ControllerResult<Void> maybeFenceOneStaleBroker() {
- List<ApiMessageAndVersion> records = new ArrayList<>();
+ ControllerResult<Boolean> maybeFenceOneStaleBroker() {
BrokerHeartbeatManager heartbeatManager =
clusterControl.heartbeatManager();
- heartbeatManager.findOneStaleBroker().ifPresent(brokerId -> {
- // Even though multiple brokers can go stale at a time, we will
process
- // fencing one at a time so that the effect of fencing each broker
is visible
- // to the system prior to processing the next one
- log.info("Fencing broker {} because its session has timed out.",
brokerId);
- handleBrokerFenced(brokerId, records);
- heartbeatManager.fence(brokerId);
- });
- return ControllerResult.of(records, null);
+ Optional<BrokerIdAndEpoch> idAndEpoch =
heartbeatManager.tracker().maybeRemoveExpired();
Review Comment:
yes, they remain until they expire. we always check what epoch each entry
has, so ignoring the old ones is easy.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]