frankvicky commented on code in PR #20461:
URL: https://github.com/apache/kafka/pull/20461#discussion_r2322628958


##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -3108,34 +3109,54 @@ public DescribeLogDirsRequest.Builder createRequest(int 
timeoutMs) {
                 @Override
                 public void handleResponse(AbstractResponse abstractResponse) {
                     DescribeLogDirsResponse response = 
(DescribeLogDirsResponse) abstractResponse;
+
+                    Set<TopicPartition> pendingPartitions = new 
HashSet<>(replicaDirInfoByPartition.keySet());
+                    Map<String, Throwable> directoryFailures = new HashMap<>();
+
                     for (Map.Entry<String, LogDirDescription> responseEntry : 
logDirDescriptions(response).entrySet()) {
                         String logDir = responseEntry.getKey();
                         LogDirDescription logDirInfo = 
responseEntry.getValue();
 
                         // No replica info will be provided if the log 
directory is offline
                         if (logDirInfo.error() instanceof 
KafkaStorageException)
                             continue;
-                        if (logDirInfo.error() != null)
-                            handleFailure(new IllegalStateException(
-                                "The error " + 
logDirInfo.error().getClass().getName() + " for log directory " + logDir + " in 
the response from broker " + brokerId + " is illegal"));
-
-                        for (Map.Entry<TopicPartition, ReplicaInfo> 
replicaInfoEntry : logDirInfo.replicaInfos().entrySet()) {
-                            TopicPartition tp = replicaInfoEntry.getKey();
-                            ReplicaInfo replicaInfo = 
replicaInfoEntry.getValue();
-                            ReplicaLogDirInfo replicaLogDirInfo = 
replicaDirInfoByPartition.get(tp);
-                            if (replicaLogDirInfo == null) {
-                                log.warn("Server response from broker {} 
mentioned unknown partition {}", brokerId, tp);
-                            } else if (replicaInfo.isFuture()) {
-                                replicaDirInfoByPartition.put(tp, new 
ReplicaLogDirInfo(replicaLogDirInfo.getCurrentReplicaLogDir(),
-                                    
replicaLogDirInfo.getCurrentReplicaOffsetLag(),
-                                    logDir,
-                                    replicaInfo.offsetLag()));
-                            } else {
-                                replicaDirInfoByPartition.put(tp, new 
ReplicaLogDirInfo(logDir,
-                                    replicaInfo.offsetLag(),
-                                    replicaLogDirInfo.getFutureReplicaLogDir(),
-                                    
replicaLogDirInfo.getFutureReplicaOffsetLag()));
+                        if (logDirInfo.error() instanceof 
ClusterAuthorizationException)
+                            handleFailure(logDirInfo.error());
+
+                        if (logDirInfo.error() == null) {
+                            for (Map.Entry<TopicPartition, ReplicaInfo> 
replicaInfoEntry : logDirInfo.replicaInfos().entrySet()) {
+                                TopicPartition tp = replicaInfoEntry.getKey();
+                                ReplicaInfo replicaInfo = 
replicaInfoEntry.getValue();
+                                ReplicaLogDirInfo replicaLogDirInfo = 
replicaDirInfoByPartition.get(tp);
+                                if (replicaLogDirInfo == null) {
+                                    log.warn("Server response from broker {} 
mentioned unknown partition {}", brokerId, tp);
+                                } else if (replicaInfo.isFuture()) {
+                                    replicaDirInfoByPartition.put(tp, new 
ReplicaLogDirInfo(replicaLogDirInfo.getCurrentReplicaLogDir(),
+                                            
replicaLogDirInfo.getCurrentReplicaOffsetLag(),
+                                            logDir,
+                                            replicaInfo.offsetLag()));
+                                } else {
+                                    replicaDirInfoByPartition.put(tp, new 
ReplicaLogDirInfo(logDir,
+                                            replicaInfo.offsetLag(),
+                                            
replicaLogDirInfo.getFutureReplicaLogDir(),
+                                            
replicaLogDirInfo.getFutureReplicaOffsetLag()));
+                                }
+                                pendingPartitions.remove(tp);
                             }
+                        } else {
+                            directoryFailures.put(logDir, logDirInfo.error());
+                        }
+                    }
+
+                    if (!pendingPartitions.isEmpty() && 
!directoryFailures.isEmpty()) {
+                        ArrayList<String> errorAtDir = new ArrayList<>();

Review Comment:
   nit
   ```suggestion
                           List<String> errorAtDir = new ArrayList<>();
   ```



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -3108,34 +3109,54 @@ public DescribeLogDirsRequest.Builder createRequest(int 
timeoutMs) {
                 @Override
                 public void handleResponse(AbstractResponse abstractResponse) {
                     DescribeLogDirsResponse response = 
(DescribeLogDirsResponse) abstractResponse;
+
+                    Set<TopicPartition> pendingPartitions = new 
HashSet<>(replicaDirInfoByPartition.keySet());
+                    Map<String, Throwable> directoryFailures = new HashMap<>();
+
                     for (Map.Entry<String, LogDirDescription> responseEntry : 
logDirDescriptions(response).entrySet()) {
                         String logDir = responseEntry.getKey();
                         LogDirDescription logDirInfo = 
responseEntry.getValue();
 
                         // No replica info will be provided if the log 
directory is offline
                         if (logDirInfo.error() instanceof 
KafkaStorageException)
                             continue;
-                        if (logDirInfo.error() != null)
-                            handleFailure(new IllegalStateException(
-                                "The error " + 
logDirInfo.error().getClass().getName() + " for log directory " + logDir + " in 
the response from broker " + brokerId + " is illegal"));
-
-                        for (Map.Entry<TopicPartition, ReplicaInfo> 
replicaInfoEntry : logDirInfo.replicaInfos().entrySet()) {
-                            TopicPartition tp = replicaInfoEntry.getKey();
-                            ReplicaInfo replicaInfo = 
replicaInfoEntry.getValue();
-                            ReplicaLogDirInfo replicaLogDirInfo = 
replicaDirInfoByPartition.get(tp);
-                            if (replicaLogDirInfo == null) {
-                                log.warn("Server response from broker {} 
mentioned unknown partition {}", brokerId, tp);
-                            } else if (replicaInfo.isFuture()) {
-                                replicaDirInfoByPartition.put(tp, new 
ReplicaLogDirInfo(replicaLogDirInfo.getCurrentReplicaLogDir(),
-                                    
replicaLogDirInfo.getCurrentReplicaOffsetLag(),
-                                    logDir,
-                                    replicaInfo.offsetLag()));
-                            } else {
-                                replicaDirInfoByPartition.put(tp, new 
ReplicaLogDirInfo(logDir,
-                                    replicaInfo.offsetLag(),
-                                    replicaLogDirInfo.getFutureReplicaLogDir(),
-                                    
replicaLogDirInfo.getFutureReplicaOffsetLag()));
+                        if (logDirInfo.error() instanceof 
ClusterAuthorizationException)
+                            handleFailure(logDirInfo.error());
+
+                        if (logDirInfo.error() == null) {
+                            for (Map.Entry<TopicPartition, ReplicaInfo> 
replicaInfoEntry : logDirInfo.replicaInfos().entrySet()) {
+                                TopicPartition tp = replicaInfoEntry.getKey();
+                                ReplicaInfo replicaInfo = 
replicaInfoEntry.getValue();
+                                ReplicaLogDirInfo replicaLogDirInfo = 
replicaDirInfoByPartition.get(tp);
+                                if (replicaLogDirInfo == null) {
+                                    log.warn("Server response from broker {} 
mentioned unknown partition {}", brokerId, tp);
+                                } else if (replicaInfo.isFuture()) {
+                                    replicaDirInfoByPartition.put(tp, new 
ReplicaLogDirInfo(replicaLogDirInfo.getCurrentReplicaLogDir(),
+                                            
replicaLogDirInfo.getCurrentReplicaOffsetLag(),
+                                            logDir,
+                                            replicaInfo.offsetLag()));
+                                } else {
+                                    replicaDirInfoByPartition.put(tp, new 
ReplicaLogDirInfo(logDir,
+                                            replicaInfo.offsetLag(),
+                                            
replicaLogDirInfo.getFutureReplicaLogDir(),
+                                            
replicaLogDirInfo.getFutureReplicaOffsetLag()));
+                                }
+                                pendingPartitions.remove(tp);
                             }
+                        } else {
+                            directoryFailures.put(logDir, logDirInfo.error());
+                        }
+                    }
+
+                    if (!pendingPartitions.isEmpty() && 
!directoryFailures.isEmpty()) {
+                        ArrayList<String> errorAtDir = new ArrayList<>();
+                        for (Map.Entry<String, Throwable> entry : 
directoryFailures.entrySet()) {
+                            
errorAtDir.add(entry.getValue().getClass().getName() + " at " + entry.getKey());
+                        }

Review Comment:
   nit:
   ```suggestion
                           directoryFailures.forEach((k, v) -> 
errorAtDir.add(v.getClass().getName() + " at " + k));
   ```



##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -2517,6 +2517,55 @@ public void testDescribeReplicaLogDirsUnexpected() 
throws ExecutionException, In
         }
     }
 
+    @Test
+    public void testDescribeReplicaLogDirsWithAuthorizationException() throws 
ExecutionException, InterruptedException {
+        TopicPartitionReplica tpr = new TopicPartitionReplica("topic", 12, 1);
+
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            String broker1log0 = "/var/data/kafka0";
+            env.kafkaClient().prepareResponseFrom(
+                    
prepareDescribeLogDirsResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, 
broker1log0),
+                    env.cluster().nodeById(tpr.brokerId()));
+
+            DescribeReplicaLogDirsResult result = 
env.adminClient().describeReplicaLogDirs(singletonList(tpr));
+            Map<TopicPartitionReplica, 
KafkaFuture<DescribeReplicaLogDirsResult.ReplicaLogDirInfo>> values = 
result.values();
+
+            Throwable e = assertThrows(Exception.class, () -> 
values.get(tpr).get());
+            assertInstanceOf(ClusterAuthorizationException.class, 
e.getCause());

Review Comment:
   Please use `TestUtils#assertFutureThrows` instead.



##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -2517,6 +2517,55 @@ public void testDescribeReplicaLogDirsUnexpected() 
throws ExecutionException, In
         }
     }
 
+    @Test
+    public void testDescribeReplicaLogDirsWithAuthorizationException() throws 
ExecutionException, InterruptedException {
+        TopicPartitionReplica tpr = new TopicPartitionReplica("topic", 12, 1);
+
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            String broker1log0 = "/var/data/kafka0";
+            env.kafkaClient().prepareResponseFrom(
+                    
prepareDescribeLogDirsResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, 
broker1log0),
+                    env.cluster().nodeById(tpr.brokerId()));
+
+            DescribeReplicaLogDirsResult result = 
env.adminClient().describeReplicaLogDirs(singletonList(tpr));
+            Map<TopicPartitionReplica, 
KafkaFuture<DescribeReplicaLogDirsResult.ReplicaLogDirInfo>> values = 
result.values();
+
+            Throwable e = assertThrows(Exception.class, () -> 
values.get(tpr).get());
+            assertInstanceOf(ClusterAuthorizationException.class, 
e.getCause());
+        }
+
+    }
+
+    @Test
+    public void testDescribeReplicaLogDirsWithSingleDirException() throws 
ExecutionException, InterruptedException {
+        int brokerId = 1;
+        TopicPartitionReplica successfulTpr = new 
TopicPartitionReplica("topic", 12, brokerId);
+        TopicPartitionReplica failedTpr = new TopicPartitionReplica("failed", 
12, brokerId);
+
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            String broker1log0 = "/var/data/kafka0";
+            String broker1log1 = "/var/data/kafka1";
+            int broker1Log0PartitionSize = 987654321;
+            int broker1Log0OffsetLag = 24;
+
+            DescribeLogDirsResponseData.DescribeLogDirsResult successfulResult 
= prepareDescribeLogDirsResult(
+                    successfulTpr, broker1log0, broker1Log0PartitionSize, 
broker1Log0OffsetLag, false);
+            DescribeLogDirsResponseData.DescribeLogDirsResult failedResult = 
new DescribeLogDirsResponseData.DescribeLogDirsResult()
+                    .setErrorCode(Errors.LOG_DIR_NOT_FOUND.code())
+                    .setLogDir(broker1log1);
+            DescribeLogDirsResponse response = new DescribeLogDirsResponse(new 
DescribeLogDirsResponseData().setResults(asList(successfulResult, 
failedResult)));
+            env.kafkaClient().prepareResponseFrom(response, 
env.cluster().nodeById(successfulTpr.brokerId()));
+
+            DescribeReplicaLogDirsResult result = 
env.adminClient().describeReplicaLogDirs(asList(successfulTpr, failedTpr));

Review Comment:
   List.of



##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -2517,6 +2517,55 @@ public void testDescribeReplicaLogDirsUnexpected() 
throws ExecutionException, In
         }
     }
 
+    @Test
+    public void testDescribeReplicaLogDirsWithAuthorizationException() throws 
ExecutionException, InterruptedException {
+        TopicPartitionReplica tpr = new TopicPartitionReplica("topic", 12, 1);
+
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            String broker1log0 = "/var/data/kafka0";
+            env.kafkaClient().prepareResponseFrom(
+                    
prepareDescribeLogDirsResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, 
broker1log0),
+                    env.cluster().nodeById(tpr.brokerId()));
+
+            DescribeReplicaLogDirsResult result = 
env.adminClient().describeReplicaLogDirs(singletonList(tpr));
+            Map<TopicPartitionReplica, 
KafkaFuture<DescribeReplicaLogDirsResult.ReplicaLogDirInfo>> values = 
result.values();
+
+            Throwable e = assertThrows(Exception.class, () -> 
values.get(tpr).get());
+            assertInstanceOf(ClusterAuthorizationException.class, 
e.getCause());
+        }
+
+    }
+
+    @Test
+    public void testDescribeReplicaLogDirsWithSingleDirException() throws 
ExecutionException, InterruptedException {
+        int brokerId = 1;
+        TopicPartitionReplica successfulTpr = new 
TopicPartitionReplica("topic", 12, brokerId);
+        TopicPartitionReplica failedTpr = new TopicPartitionReplica("failed", 
12, brokerId);
+
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            String broker1log0 = "/var/data/kafka0";
+            String broker1log1 = "/var/data/kafka1";
+            int broker1Log0PartitionSize = 987654321;
+            int broker1Log0OffsetLag = 24;
+
+            DescribeLogDirsResponseData.DescribeLogDirsResult successfulResult 
= prepareDescribeLogDirsResult(
+                    successfulTpr, broker1log0, broker1Log0PartitionSize, 
broker1Log0OffsetLag, false);
+            DescribeLogDirsResponseData.DescribeLogDirsResult failedResult = 
new DescribeLogDirsResponseData.DescribeLogDirsResult()
+                    .setErrorCode(Errors.LOG_DIR_NOT_FOUND.code())
+                    .setLogDir(broker1log1);
+            DescribeLogDirsResponse response = new DescribeLogDirsResponse(new 
DescribeLogDirsResponseData().setResults(asList(successfulResult, 
failedResult)));
+            env.kafkaClient().prepareResponseFrom(response, 
env.cluster().nodeById(successfulTpr.brokerId()));
+
+            DescribeReplicaLogDirsResult result = 
env.adminClient().describeReplicaLogDirs(asList(successfulTpr, failedTpr));
+            Map<TopicPartitionReplica, 
KafkaFuture<DescribeReplicaLogDirsResult.ReplicaLogDirInfo>> values = 
result.values();
+
+            assertNotNull(values.get(successfulTpr).get());
+            assertThrows(Exception.class, () -> values.get(failedTpr).get());

Review Comment:
   Do you think we should assert the class of this exception?



##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -2517,6 +2517,55 @@ public void testDescribeReplicaLogDirsUnexpected() 
throws ExecutionException, In
         }
     }
 
+    @Test
+    public void testDescribeReplicaLogDirsWithAuthorizationException() throws 
ExecutionException, InterruptedException {
+        TopicPartitionReplica tpr = new TopicPartitionReplica("topic", 12, 1);
+
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            String broker1log0 = "/var/data/kafka0";
+            env.kafkaClient().prepareResponseFrom(
+                    
prepareDescribeLogDirsResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, 
broker1log0),
+                    env.cluster().nodeById(tpr.brokerId()));
+
+            DescribeReplicaLogDirsResult result = 
env.adminClient().describeReplicaLogDirs(singletonList(tpr));

Review Comment:
   List.of



##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -2517,6 +2517,55 @@ public void testDescribeReplicaLogDirsUnexpected() 
throws ExecutionException, In
         }
     }
 
+    @Test
+    public void testDescribeReplicaLogDirsWithAuthorizationException() throws 
ExecutionException, InterruptedException {
+        TopicPartitionReplica tpr = new TopicPartitionReplica("topic", 12, 1);
+
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            String broker1log0 = "/var/data/kafka0";
+            env.kafkaClient().prepareResponseFrom(
+                    
prepareDescribeLogDirsResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, 
broker1log0),
+                    env.cluster().nodeById(tpr.brokerId()));
+
+            DescribeReplicaLogDirsResult result = 
env.adminClient().describeReplicaLogDirs(singletonList(tpr));
+            Map<TopicPartitionReplica, 
KafkaFuture<DescribeReplicaLogDirsResult.ReplicaLogDirInfo>> values = 
result.values();
+
+            Throwable e = assertThrows(Exception.class, () -> 
values.get(tpr).get());
+            assertInstanceOf(ClusterAuthorizationException.class, 
e.getCause());
+        }
+
+    }
+
+    @Test
+    public void testDescribeReplicaLogDirsWithSingleDirException() throws 
ExecutionException, InterruptedException {
+        int brokerId = 1;
+        TopicPartitionReplica successfulTpr = new 
TopicPartitionReplica("topic", 12, brokerId);
+        TopicPartitionReplica failedTpr = new TopicPartitionReplica("failed", 
12, brokerId);
+
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            String broker1log0 = "/var/data/kafka0";
+            String broker1log1 = "/var/data/kafka1";
+            int broker1Log0PartitionSize = 987654321;
+            int broker1Log0OffsetLag = 24;
+
+            DescribeLogDirsResponseData.DescribeLogDirsResult successfulResult 
= prepareDescribeLogDirsResult(
+                    successfulTpr, broker1log0, broker1Log0PartitionSize, 
broker1Log0OffsetLag, false);
+            DescribeLogDirsResponseData.DescribeLogDirsResult failedResult = 
new DescribeLogDirsResponseData.DescribeLogDirsResult()
+                    .setErrorCode(Errors.LOG_DIR_NOT_FOUND.code())
+                    .setLogDir(broker1log1);
+            DescribeLogDirsResponse response = new DescribeLogDirsResponse(new 
DescribeLogDirsResponseData().setResults(asList(successfulResult, 
failedResult)));

Review Comment:
   List.of



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to