OmniaGM commented on code in PR #14790:
URL: https://github.com/apache/kafka/pull/14790#discussion_r1400641749


##########
core/src/main/java/kafka/server/AssignmentsManager.java:
##########
@@ -336,6 +356,27 @@ private static boolean responseIsError(ClientResponse 
response) {
         return false;
     }
 
+    private static void applyCallbackOnComplete(
+            AssignReplicasToDirsResponseData data,
+            Map<TopicIdPartition, AssignmentEvent> sent) {
+        for (AssignReplicasToDirsResponseData.DirectoryData directory : 
data.directories()) {
+            for (AssignReplicasToDirsResponseData.TopicData topic : 
directory.topics()) {
+                for (AssignReplicasToDirsResponseData.PartitionData partition 
: topic.partitions()) {
+                    TopicIdPartition topicPartition = new 
TopicIdPartition(topic.topicId(), partition.partitionIndex());
+                    AssignmentEvent event = sent.get(topicPartition);
+                    if (event == null) {
+                        log.error("AssignReplicasToDirsResponse contains 
unexpected partition {} into directory {}. No callback to apply.", partition, 
directory.id());
+                    } else {
+                        Errors error = Errors.forCode(partition.errorCode());
+                        if (error == Errors.NONE && event.callback != null) {
+                            
event.callback.accept(DirectoryEventRequestState.COMPLETED);
+                        }

Review Comment:
   good point, updated the code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to