OmniaGM commented on code in PR #14790: URL: https://github.com/apache/kafka/pull/14790#discussion_r1400641749
########## core/src/main/java/kafka/server/AssignmentsManager.java: ########## @@ -336,6 +356,27 @@ private static boolean responseIsError(ClientResponse response) { return false; } + private static void applyCallbackOnComplete( + AssignReplicasToDirsResponseData data, + Map<TopicIdPartition, AssignmentEvent> sent) { + for (AssignReplicasToDirsResponseData.DirectoryData directory : data.directories()) { + for (AssignReplicasToDirsResponseData.TopicData topic : directory.topics()) { + for (AssignReplicasToDirsResponseData.PartitionData partition : topic.partitions()) { + TopicIdPartition topicPartition = new TopicIdPartition(topic.topicId(), partition.partitionIndex()); + AssignmentEvent event = sent.get(topicPartition); + if (event == null) { + log.error("AssignReplicasToDirsResponse contains unexpected partition {} into directory {}. No callback to apply.", partition, directory.id()); + } else { + Errors error = Errors.forCode(partition.errorCode()); + if (error == Errors.NONE && event.callback != null) { + event.callback.accept(DirectoryEventRequestState.COMPLETED); + } Review Comment: good point, updated the code -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org