mimaison commented on a change in pull request #8311:
URL: https://github.com/apache/kafka/pull/8311#discussion_r435119315



##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -477,6 +484,27 @@ public void testCreateTopics() throws Exception {
         }
     }
 
+    @Test
+    public void testCreateTopicsPartialResponse() throws Exception {
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(body -> body instanceof 
CreateTopicsRequest,
+                    prepareCreateTopicsResponse("myTopic", Errors.NONE));
+            CreateTopicsResult topicsResult = env.adminClient().createTopics(
+                    asList(new NewTopic("myTopic", Collections.singletonMap(0, 
asList(0, 1, 2))),
+                           new NewTopic("myTopic2", 
Collections.singletonMap(0, asList(0, 1, 2)))),
+                    new CreateTopicsOptions().timeoutMs(10000));
+            topicsResult.values().get("myTopic").get();
+            try {
+                topicsResult.values().get("myTopic2").get();
+                fail("Expected an exception.");
+            } catch (ExecutionException e) {

Review comment:
       We can use `assertThrows()` here. Same below

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -2234,20 +2251,27 @@ public AlterReplicaLogDirsResult 
alterReplicaLogDirs(Map<TopicPartitionReplica,
                 @Override
                 public void handleResponse(AbstractResponse abstractResponse) {
                     AlterReplicaLogDirsResponse response = 
(AlterReplicaLogDirsResponse) abstractResponse;
-                    for (Map.Entry<TopicPartition, Errors> responseEntry: 
response.responses().entrySet()) {
-                        TopicPartition tp = responseEntry.getKey();
-                        Errors error = responseEntry.getValue();
-                        TopicPartitionReplica replica = new 
TopicPartitionReplica(tp.topic(), tp.partition(), brokerId);
-                        KafkaFutureImpl<Void> future = futures.get(replica);
-                        if (future == null) {
-                            handleFailure(new IllegalStateException(
-                                "The partition " + tp + " in the response from 
broker " + brokerId + " is not in the request"));
-                        } else if (error == Errors.NONE) {
-                            future.complete(null);
-                        } else {
-                            future.completeExceptionally(error.exception());
+                    for (AlterReplicaLogDirTopicResult topicResult: 
response.data().results()) {
+                        for (AlterReplicaLogDirPartitionResult 
partitionResult: topicResult.partitions()) {
+                            TopicPartitionReplica replica = new 
TopicPartitionReplica(
+                                    topicResult.topicName(), 
partitionResult.partitionIndex(), brokerId);
+                            KafkaFutureImpl<Void> future = 
futures.get(replica);
+                            if (future == null) {
+                                log.warn("The partition {} in the response 
from broker {}} is not in the request",

Review comment:
       There's an extra `}` 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to