rondagostino commented on code in PR #14863:
URL: https://github.com/apache/kafka/pull/14863#discussion_r1416193447


##########
core/src/test/scala/unit/kafka/server/ControllerApisTest.scala:
##########
@@ -1139,31 +1139,24 @@ class ControllerApisTest {
   }
 
   @Test
-  def testAssignReplicasToDirsReturnsUnsupportedVersion(): Unit = {
+  def testAssignReplicasToDirs(): Unit = {
     val controller = mock(classOf[Controller])
-    val controllerApis = createControllerApis(None, controller)
+    val authorizer = mock(classOf[Authorizer])
+    val controllerApis = createControllerApis(Some(authorizer), controller)
+
+    val request = new AssignReplicasToDirsRequest.Builder(new 
AssignReplicasToDirsRequestData()).build()
+
+    when(authorizer.authorize(any[RequestContext], 
ArgumentMatchers.eq(Collections.singletonList(new Action(
+      AclOperation.CLUSTER_ACTION,
+      new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, 
PatternType.LITERAL),
+      1, true, true
+    )))))
+      .thenReturn(Collections.singletonList(AuthorizationResult.ALLOWED))
+    when(controller.assignReplicasToDirs(any[ControllerRequestContext], 
ArgumentMatchers.eq(request.data)))
+      .thenThrow(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception())

Review Comment:
   Need to return something like `completableFuture.completeExceptionally()` 
here as opposed to throwing an exception directly.



##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -2019,6 +2021,70 @@ ListPartitionReassignmentsResponseData 
listPartitionReassignments(
         return response;
     }
 
+    ControllerResult<AssignReplicasToDirsResponseData> 
handleAssignReplicasToDirs(AssignReplicasToDirsRequestData request) {
+        int brokerId = request.brokerId();
+        clusterControl.checkBrokerEpoch(brokerId, request.brokerEpoch());
+        BrokerRegistration brokerRegistration = 
clusterControl.brokerRegistrations().get(brokerId);
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        AssignReplicasToDirsResponseData response = new 
AssignReplicasToDirsResponseData();
+        Set<TopicIdPartition> leaderAndIsrUpdates = new HashSet<>();
+        for (AssignReplicasToDirsRequestData.DirectoryData reqDir : 
request.directories()) {
+            Uuid dirId = reqDir.id();
+            AssignReplicasToDirsResponseData.DirectoryData resDir = new 
AssignReplicasToDirsResponseData.DirectoryData().setId(dirId);
+            for (AssignReplicasToDirsRequestData.TopicData reqTopic : 
reqDir.topics()) {
+                Uuid topicId = reqTopic.topicId();
+                Errors topicError = Errors.NONE;
+                TopicControlInfo topicControl = this.topics.get(topicId);

Review Comment:
   nit: `topicControlInfo` or `topicInfo` might be a better name than 
`topicControl` -- get the "Info" in there somehow.



##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -2019,6 +2021,70 @@ ListPartitionReassignmentsResponseData 
listPartitionReassignments(
         return response;
     }
 
+    ControllerResult<AssignReplicasToDirsResponseData> 
handleAssignReplicasToDirs(AssignReplicasToDirsRequestData request) {
+        int brokerId = request.brokerId();
+        clusterControl.checkBrokerEpoch(brokerId, request.brokerEpoch());
+        BrokerRegistration brokerRegistration = 
clusterControl.brokerRegistrations().get(brokerId);
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        AssignReplicasToDirsResponseData response = new 
AssignReplicasToDirsResponseData();
+        Set<TopicIdPartition> leaderAndIsrUpdates = new HashSet<>();
+        for (AssignReplicasToDirsRequestData.DirectoryData reqDir : 
request.directories()) {
+            Uuid dirId = reqDir.id();
+            AssignReplicasToDirsResponseData.DirectoryData resDir = new 
AssignReplicasToDirsResponseData.DirectoryData().setId(dirId);
+            for (AssignReplicasToDirsRequestData.TopicData reqTopic : 
reqDir.topics()) {
+                Uuid topicId = reqTopic.topicId();
+                Errors topicError = Errors.NONE;
+                TopicControlInfo topicControl = this.topics.get(topicId);
+                if (topicControl == null) {
+                    log.warn("AssignReplicasToDirsRequest from broker {} 
references unknown topic ID {}", brokerId, topicId);
+                    topicError = Errors.UNKNOWN_TOPIC_ID;
+                }
+                AssignReplicasToDirsResponseData.TopicData resTopic = new 
AssignReplicasToDirsResponseData.TopicData().setTopicId(topicId);
+                for (AssignReplicasToDirsRequestData.PartitionData 
reqPartition : reqTopic.partitions()) {
+                    int partitionIndex = reqPartition.partitionIndex();
+                    Errors partitionError = topicError;
+                    if (topicError == Errors.NONE) {
+                        String topicName = topicControl.name;
+                        PartitionRegistration partitionRegistration = 
topicControl.parts.get(partitionIndex);
+                        if (partitionRegistration == null) {
+                            log.warn("AssignReplicasToDirsRequest from broker 
{} references unknown partition {}-{}", brokerId, topicName, partitionIndex);
+                            partitionError = Errors.UNKNOWN_TOPIC_OR_PARTITION;
+                        } else if 
(!Replicas.contains(partitionRegistration.replicas, brokerId)) {
+                            log.warn("AssignReplicasToDirsRequest from broker 
{} references non assigned partition {}-{}", brokerId, topicName, 
partitionIndex);
+                            partitionError = Errors.NOT_LEADER_OR_FOLLOWER;
+                        } else {
+                            Optional<ApiMessageAndVersion> 
partitionChangeRecord = new PartitionChangeBuilder(
+                                    partitionRegistration,
+                                    topicId,
+                                    partitionIndex,
+                                    new LeaderAcceptor(clusterControl, 
partitionRegistration),
+                                    featureControl.metadataVersion(),
+                                    getTopicEffectiveMinIsr(topicName)
+                            )
+                                    .setDirectory(brokerId, dirId)
+                                    .setDefaultDirProvider(clusterDescriber)
+                                    .build();
+                            partitionChangeRecord.ifPresent(records::add);
+
+                            if (!brokerRegistration.hasOnlineDir(dirId)) {

Review Comment:
   Do we need to do this search for every partition?  I think we could just do 
it once above and then reuse the result here?



##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -2019,6 +2021,70 @@ ListPartitionReassignmentsResponseData 
listPartitionReassignments(
         return response;
     }
 
+    ControllerResult<AssignReplicasToDirsResponseData> 
handleAssignReplicasToDirs(AssignReplicasToDirsRequestData request) {
+        int brokerId = request.brokerId();
+        clusterControl.checkBrokerEpoch(brokerId, request.brokerEpoch());
+        BrokerRegistration brokerRegistration = 
clusterControl.brokerRegistrations().get(brokerId);

Review Comment:
   Need to check for `null` to ensure it is actually registered.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to