rondagostino commented on code in PR #14863: URL: https://github.com/apache/kafka/pull/14863#discussion_r1416193447
########## core/src/test/scala/unit/kafka/server/ControllerApisTest.scala: ########## @@ -1139,31 +1139,24 @@ class ControllerApisTest { } @Test - def testAssignReplicasToDirsReturnsUnsupportedVersion(): Unit = { + def testAssignReplicasToDirs(): Unit = { val controller = mock(classOf[Controller]) - val controllerApis = createControllerApis(None, controller) + val authorizer = mock(classOf[Authorizer]) + val controllerApis = createControllerApis(Some(authorizer), controller) + + val request = new AssignReplicasToDirsRequest.Builder(new AssignReplicasToDirsRequestData()).build() + + when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(Collections.singletonList(new Action( + AclOperation.CLUSTER_ACTION, + new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL), + 1, true, true + ))))) + .thenReturn(Collections.singletonList(AuthorizationResult.ALLOWED)) + when(controller.assignReplicasToDirs(any[ControllerRequestContext], ArgumentMatchers.eq(request.data))) + .thenThrow(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception()) Review Comment: Need to return something like `completableFuture.completeExceptionally()` here as opposed to throwing an exception directly. ########## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ########## @@ -2019,6 +2021,70 @@ ListPartitionReassignmentsResponseData listPartitionReassignments( return response; } + ControllerResult<AssignReplicasToDirsResponseData> handleAssignReplicasToDirs(AssignReplicasToDirsRequestData request) { + int brokerId = request.brokerId(); + clusterControl.checkBrokerEpoch(brokerId, request.brokerEpoch()); + BrokerRegistration brokerRegistration = clusterControl.brokerRegistrations().get(brokerId); + List<ApiMessageAndVersion> records = new ArrayList<>(); + AssignReplicasToDirsResponseData response = new AssignReplicasToDirsResponseData(); + Set<TopicIdPartition> leaderAndIsrUpdates = new HashSet<>(); + for (AssignReplicasToDirsRequestData.DirectoryData reqDir : request.directories()) { + Uuid dirId = reqDir.id(); + AssignReplicasToDirsResponseData.DirectoryData resDir = new AssignReplicasToDirsResponseData.DirectoryData().setId(dirId); + for (AssignReplicasToDirsRequestData.TopicData reqTopic : reqDir.topics()) { + Uuid topicId = reqTopic.topicId(); + Errors topicError = Errors.NONE; + TopicControlInfo topicControl = this.topics.get(topicId); Review Comment: nit: `topicControlInfo` or `topicInfo` might be a better name than `topicControl` -- get the "Info" in there somehow. ########## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ########## @@ -2019,6 +2021,70 @@ ListPartitionReassignmentsResponseData listPartitionReassignments( return response; } + ControllerResult<AssignReplicasToDirsResponseData> handleAssignReplicasToDirs(AssignReplicasToDirsRequestData request) { + int brokerId = request.brokerId(); + clusterControl.checkBrokerEpoch(brokerId, request.brokerEpoch()); + BrokerRegistration brokerRegistration = clusterControl.brokerRegistrations().get(brokerId); + List<ApiMessageAndVersion> records = new ArrayList<>(); + AssignReplicasToDirsResponseData response = new AssignReplicasToDirsResponseData(); + Set<TopicIdPartition> leaderAndIsrUpdates = new HashSet<>(); + for (AssignReplicasToDirsRequestData.DirectoryData reqDir : request.directories()) { + Uuid dirId = reqDir.id(); + AssignReplicasToDirsResponseData.DirectoryData resDir = new AssignReplicasToDirsResponseData.DirectoryData().setId(dirId); + for (AssignReplicasToDirsRequestData.TopicData reqTopic : reqDir.topics()) { + Uuid topicId = reqTopic.topicId(); + Errors topicError = Errors.NONE; + TopicControlInfo topicControl = this.topics.get(topicId); + if (topicControl == null) { + log.warn("AssignReplicasToDirsRequest from broker {} references unknown topic ID {}", brokerId, topicId); + topicError = Errors.UNKNOWN_TOPIC_ID; + } + AssignReplicasToDirsResponseData.TopicData resTopic = new AssignReplicasToDirsResponseData.TopicData().setTopicId(topicId); + for (AssignReplicasToDirsRequestData.PartitionData reqPartition : reqTopic.partitions()) { + int partitionIndex = reqPartition.partitionIndex(); + Errors partitionError = topicError; + if (topicError == Errors.NONE) { + String topicName = topicControl.name; + PartitionRegistration partitionRegistration = topicControl.parts.get(partitionIndex); + if (partitionRegistration == null) { + log.warn("AssignReplicasToDirsRequest from broker {} references unknown partition {}-{}", brokerId, topicName, partitionIndex); + partitionError = Errors.UNKNOWN_TOPIC_OR_PARTITION; + } else if (!Replicas.contains(partitionRegistration.replicas, brokerId)) { + log.warn("AssignReplicasToDirsRequest from broker {} references non assigned partition {}-{}", brokerId, topicName, partitionIndex); + partitionError = Errors.NOT_LEADER_OR_FOLLOWER; + } else { + Optional<ApiMessageAndVersion> partitionChangeRecord = new PartitionChangeBuilder( + partitionRegistration, + topicId, + partitionIndex, + new LeaderAcceptor(clusterControl, partitionRegistration), + featureControl.metadataVersion(), + getTopicEffectiveMinIsr(topicName) + ) + .setDirectory(brokerId, dirId) + .setDefaultDirProvider(clusterDescriber) + .build(); + partitionChangeRecord.ifPresent(records::add); + + if (!brokerRegistration.hasOnlineDir(dirId)) { Review Comment: Do we need to do this search for every partition? I think we could just do it once above and then reuse the result here? ########## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ########## @@ -2019,6 +2021,70 @@ ListPartitionReassignmentsResponseData listPartitionReassignments( return response; } + ControllerResult<AssignReplicasToDirsResponseData> handleAssignReplicasToDirs(AssignReplicasToDirsRequestData request) { + int brokerId = request.brokerId(); + clusterControl.checkBrokerEpoch(brokerId, request.brokerEpoch()); + BrokerRegistration brokerRegistration = clusterControl.brokerRegistrations().get(brokerId); Review Comment: Need to check for `null` to ensure it is actually registered. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org