cmccabe commented on code in PR #14838:
URL: https://github.com/apache/kafka/pull/14838#discussion_r1416327866


##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -354,6 +357,25 @@ public ControllerResult<BrokerRegistrationReply> 
registerBroker(
             throw new BrokerIdNotRegisteredException("Controller is in 
pre-migration mode and cannot register KRaft " +
                 "brokers until the metadata migration is complete.");
         }
+
+        if (featureControl.metadataVersion().isDirectoryAssignmentSupported()) 
{
+            Set<Uuid> set = new HashSet<>(request.logDirs());
+            if (set.stream().anyMatch(DirectoryId::reserved)) {
+                throw new InvalidRequestException("Reserved directory ID in 
request");
+            }
+            if (set.size() != request.logDirs().size()) {
+                throw new InvalidRequestException("Duplicate directory ID in 
request");
+            }
+            for (BrokerRegistration registration : 
brokerRegistrations().values()) {

Review Comment:
   Another thing. This is O(num_brokers * num_directories) which isn't good for 
larger deployments. I think we should just have a `TimelineHashMap<Uuid, 
Integer>` that maps log dir UUID to the broker that has it. The main hassle is 
removing the mappings during broker unregistration, but that is rare, of course.



##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -354,6 +357,25 @@ public ControllerResult<BrokerRegistrationReply> 
registerBroker(
             throw new BrokerIdNotRegisteredException("Controller is in 
pre-migration mode and cannot register KRaft " +
                 "brokers until the metadata migration is complete.");
         }
+
+        if (featureControl.metadataVersion().isDirectoryAssignmentSupported()) 
{
+            Set<Uuid> set = new HashSet<>(request.logDirs());
+            if (set.stream().anyMatch(DirectoryId::reserved)) {
+                throw new InvalidRequestException("Reserved directory ID in 
request");
+            }
+            if (set.size() != request.logDirs().size()) {
+                throw new InvalidRequestException("Duplicate directory ID in 
request");
+            }
+            for (BrokerRegistration registration : 
brokerRegistrations().values()) {

Review Comment:
   Another thing. This is `O(num_brokers * num_directories)` which isn't good 
for larger deployments. I think we should just have a `TimelineHashMap<Uuid, 
Integer>` that maps log dir UUID to the broker that has it. The main hassle is 
removing the mappings during broker unregistration, but that is rare, of course.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to