rondagostino commented on code in PR #14838: URL: https://github.com/apache/kafka/pull/14838#discussion_r1414131035
########## metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java: ########## @@ -61,6 +67,11 @@ static class BrokerHeartbeatState { */ private final int id; + /** + * The directories which the broker currently has available. + */ + private final List<Uuid> directories; Review Comment: Can we rename this to `directoriesSorted` or `sortedDirectories` to indicate that this list must be sorted? We depend on it being sorted when invoking `DirectoryId.isOnline()`. And when I look at where we invoke that method, it seems that the field `BrokerRegistration.directories` should get a similar rename. The other alternative is to remove the constraint that the list be sorted within `DirectoryId.isOnline()`. Since we don't expect to have a large number of directories, I would lean toward this option in order to avoid bugs down the road if we forget to sort a list somewhere. Could we then remove the sorting of the list in this class and within `BrokerRegistration` and in any tests? Or am I missing something and we really do need to keep these sorted? ########## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ########## @@ -284,7 +286,7 @@ ReplicaPlacer replicaPlacer() { public void activate() { heartbeatManager = new BrokerHeartbeatManager(logContext, time, sessionTimeoutNs); for (BrokerRegistration registration : brokerRegistrations.values()) { - heartbeatManager.touch(registration.id(), registration.fenced(), -1); + heartbeatManager.register(registration.id(), registration.fenced(), registration.directories()); Review Comment: This change seems a bit odd to me. I think it will work, and in fact I think it wil do exactly what was being done before, but you moved the creation of the broker entry in the heartbeat manager's internal map into the `register()` method and removed it from the `touch()` method -- which I think is a god change. But now this reads weirdly because we aren't registering a broker -- the broker should already have been registered by this point, and we just want to "touch" it. I think this change can be reverted? ########## metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java: ########## @@ -272,8 +273,16 @@ public void testRegistrationWithIncorrectClusterId() { (short) 1)); } + private static Stream<Arguments> metadataVersions() { + return Stream.of( + MetadataVersion.IBP_3_3_IV2, + MetadataVersion.IBP_3_3_IV3, + MetadataVersion.latest() Review Comment: Maybe we want to add in `MetadataVersion.LATEST_PRODUCTION` as well? ########## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ########## @@ -354,6 +356,25 @@ public ControllerResult<BrokerRegistrationReply> registerBroker( throw new BrokerIdNotRegisteredException("Controller is in pre-migration mode and cannot register KRaft " + "brokers until the metadata migration is complete."); } + + if (featureControl.metadataVersion().isDirectoryAssignmentSupported()) { + Set<Uuid> set = new HashSet<>(request.logDirs()); + if (set.stream().anyMatch(DirectoryId::reserved)) { + throw new DuplicateBrokerRegistrationException("Reserved directory ID in request"); Review Comment: Is `DuplicateBrokerRegistrationException` appropriate, do you think? Seems like `BrokerIdNotRegisteredException` might be better, and then we would have to make an update in `testRegisterWithDuplicateDirectoryId()` to check for that other exception class. ########## metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java: ########## @@ -347,7 +347,7 @@ private void maybePublishMetadata(MetadataDelta delta, MetadataImage image, Load } metrics.updateLastAppliedImageProvenance(image.provenance()); metrics.setCurrentMetadataVersion(image.features().metadataVersion()); - if (uninitializedPublishers.isEmpty()) { + if (!uninitializedPublishers.isEmpty()) { Review Comment: Nice catch -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org