cmccabe commented on code in PR #17502:
URL: https://github.com/apache/kafka/pull/17502#discussion_r1811524670
##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -1597,18 +1597,34 @@ public ControllerResult<Void> unregisterBroker(int
brokerId) {
return ControllerResult.of(records, null);
}
- ControllerResult<Void> maybeFenceOneStaleBroker() {
- List<ApiMessageAndVersion> records = new ArrayList<>();
+ ControllerResult<Boolean> maybeFenceOneStaleBroker() {
BrokerHeartbeatManager heartbeatManager =
clusterControl.heartbeatManager();
- heartbeatManager.findOneStaleBroker().ifPresent(brokerId -> {
- // Even though multiple brokers can go stale at a time, we will
process
- // fencing one at a time so that the effect of fencing each broker
is visible
- // to the system prior to processing the next one
- log.info("Fencing broker {} because its session has timed out.",
brokerId);
- handleBrokerFenced(brokerId, records);
- heartbeatManager.fence(brokerId);
- });
- return ControllerResult.of(records, null);
+ Optional<BrokerIdAndEpoch> idAndEpoch =
heartbeatManager.tracker().maybeRemoveExpired();
+ if (!idAndEpoch.isPresent()) {
+ log.debug("No stale brokers found.");
+ return ControllerResult.of(Collections.emptyList(), false);
+ }
+ int id = idAndEpoch.get().id();
+ long epoch = idAndEpoch.get().epoch();
+ if (!clusterControl.brokerRegistrations().containsKey(id)) {
+ log.info("Removing heartbeat tracker entry for unknown broker {}
at epoch {}.",
+ id, epoch);
+ heartbeatManager.remove(id);
+ return ControllerResult.of(Collections.emptyList(), true);
+ } else if (clusterControl.brokerRegistrations().get(id).epoch() !=
epoch) {
+ log.info("Removing heartbeat tracker entry for broker {} at
previous epoch {}. " +
+ "Current epoch is {}", id, epoch,
+ clusterControl.brokerRegistrations().get(id).epoch());
+ return ControllerResult.of(Collections.emptyList(), true);
+ }
+ // Even though multiple brokers can go stale at a time, we will process
+ // fencing one at a time so that the effect of fencing each broker is
visible
+ // to the system prior to processing the next one.
+ log.info("Fencing broker {} at epoch {} because its session has timed
out.", id, epoch);
Review Comment:
Only errors should be logged at ERROR level. The broker being fenced is not
an error. It will happen on rolls, for example.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]