soarez commented on code in PR #14838: URL: https://github.com/apache/kafka/pull/14838#discussion_r1416458244
########## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ########## @@ -354,6 +357,25 @@ public ControllerResult<BrokerRegistrationReply> registerBroker( throw new BrokerIdNotRegisteredException("Controller is in pre-migration mode and cannot register KRaft " + "brokers until the metadata migration is complete."); } + + if (featureControl.metadataVersion().isDirectoryAssignmentSupported()) { + Set<Uuid> set = new HashSet<>(request.logDirs()); + if (set.stream().anyMatch(DirectoryId::reserved)) { + throw new InvalidRequestException("Reserved directory ID in request"); + } + if (set.size() != request.logDirs().size()) { + throw new InvalidRequestException("Duplicate directory ID in request"); + } + for (BrokerRegistration registration : brokerRegistrations().values()) { Review Comment: @cmccabe: > InvalidRequestException seems too generic I've introduced InvalidRegistrationException to handle these. > Also, we shouldn't be checking the previous registration for this specific broker ID. I forgot that. Thanks for pointing this out! > we should just have a TimelineHashMap<Uuid, Integer> that maps log dir UUID to the broker that has it Great suggestion. Added. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org