rondagostino commented on code in PR #14838:
URL: https://github.com/apache/kafka/pull/14838#discussion_r1414131035


##########
metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java:
##########
@@ -61,6 +67,11 @@ static class BrokerHeartbeatState {
          */
         private final int id;
 
+        /**
+         * The directories which the broker currently has available.
+         */
+        private final List<Uuid> directories;

Review Comment:
   Can we rename this to `directoriesSorted` or `sortedDirectories` to indicate 
that this list must be sorted?  We depend on it being sorted when invoking 
`DirectoryId.isOnline()`.  And when I look at where we invoke that method, it 
seems that the field `BrokerRegistration.directories` should get a similar 
rename.
   
   The other alternative is to remove the constraint that the list be sorted 
within `DirectoryId.isOnline()`.  Since we don't expect to have a large number 
of directories, I would lean toward this option in order to avoid bugs down the 
road if we forget to sort a list somewhere.  Could we then remove the sorting 
of the list in this class and within `BrokerRegistration` and in any tests?  Or 
am I missing something and we really do need to keep these sorted?



##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -284,7 +286,7 @@ ReplicaPlacer replicaPlacer() {
     public void activate() {
         heartbeatManager = new BrokerHeartbeatManager(logContext, time, 
sessionTimeoutNs);
         for (BrokerRegistration registration : brokerRegistrations.values()) {
-            heartbeatManager.touch(registration.id(), registration.fenced(), 
-1);
+            heartbeatManager.register(registration.id(), 
registration.fenced(), registration.directories());

Review Comment:
   This change seems a bit odd to me.  I think it will work, and in fact I 
think it wil do exactly what was being done before, but you moved the creation 
of the broker entry in the heartbeat manager's internal map into the 
`register()` method and removed it from the `touch()` method -- which I think 
is a god change.  But now this reads weirdly because we aren't registering a 
broker -- the broker should already have been registered by this point, and we 
just want to "touch" it.  I think this change can be reverted?



##########
metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java:
##########
@@ -272,8 +273,16 @@ public void testRegistrationWithIncorrectClusterId() {
                 (short) 1));
     }
 
+    private static Stream<Arguments> metadataVersions() {
+        return Stream.of(
+                MetadataVersion.IBP_3_3_IV2,
+                MetadataVersion.IBP_3_3_IV3,
+                MetadataVersion.latest()

Review Comment:
   Maybe we want to add in `MetadataVersion.LATEST_PRODUCTION` as well?



##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -354,6 +356,25 @@ public ControllerResult<BrokerRegistrationReply> 
registerBroker(
             throw new BrokerIdNotRegisteredException("Controller is in 
pre-migration mode and cannot register KRaft " +
                 "brokers until the metadata migration is complete.");
         }
+
+        if (featureControl.metadataVersion().isDirectoryAssignmentSupported()) 
{
+            Set<Uuid> set = new HashSet<>(request.logDirs());
+            if (set.stream().anyMatch(DirectoryId::reserved)) {
+                throw new DuplicateBrokerRegistrationException("Reserved 
directory ID in request");

Review Comment:
   Is `DuplicateBrokerRegistrationException` appropriate, do you think?  Seems 
like `BrokerIdNotRegisteredException` might be better, and then we would have 
to make an update in `testRegisterWithDuplicateDirectoryId()` to check for that 
other exception class.



##########
metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java:
##########
@@ -347,7 +347,7 @@ private void maybePublishMetadata(MetadataDelta delta, 
MetadataImage image, Load
         }
         metrics.updateLastAppliedImageProvenance(image.provenance());
         metrics.setCurrentMetadataVersion(image.features().metadataVersion());
-        if (uninitializedPublishers.isEmpty()) {
+        if (!uninitializedPublishers.isEmpty()) {

Review Comment:
   Nice catch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to