frankvicky commented on code in PR #20461: URL: https://github.com/apache/kafka/pull/20461#discussion_r2322628958
########## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ########## @@ -3108,34 +3109,54 @@ public DescribeLogDirsRequest.Builder createRequest(int timeoutMs) { @Override public void handleResponse(AbstractResponse abstractResponse) { DescribeLogDirsResponse response = (DescribeLogDirsResponse) abstractResponse; + + Set<TopicPartition> pendingPartitions = new HashSet<>(replicaDirInfoByPartition.keySet()); + Map<String, Throwable> directoryFailures = new HashMap<>(); + for (Map.Entry<String, LogDirDescription> responseEntry : logDirDescriptions(response).entrySet()) { String logDir = responseEntry.getKey(); LogDirDescription logDirInfo = responseEntry.getValue(); // No replica info will be provided if the log directory is offline if (logDirInfo.error() instanceof KafkaStorageException) continue; - if (logDirInfo.error() != null) - handleFailure(new IllegalStateException( - "The error " + logDirInfo.error().getClass().getName() + " for log directory " + logDir + " in the response from broker " + brokerId + " is illegal")); - - for (Map.Entry<TopicPartition, ReplicaInfo> replicaInfoEntry : logDirInfo.replicaInfos().entrySet()) { - TopicPartition tp = replicaInfoEntry.getKey(); - ReplicaInfo replicaInfo = replicaInfoEntry.getValue(); - ReplicaLogDirInfo replicaLogDirInfo = replicaDirInfoByPartition.get(tp); - if (replicaLogDirInfo == null) { - log.warn("Server response from broker {} mentioned unknown partition {}", brokerId, tp); - } else if (replicaInfo.isFuture()) { - replicaDirInfoByPartition.put(tp, new ReplicaLogDirInfo(replicaLogDirInfo.getCurrentReplicaLogDir(), - replicaLogDirInfo.getCurrentReplicaOffsetLag(), - logDir, - replicaInfo.offsetLag())); - } else { - replicaDirInfoByPartition.put(tp, new ReplicaLogDirInfo(logDir, - replicaInfo.offsetLag(), - replicaLogDirInfo.getFutureReplicaLogDir(), - replicaLogDirInfo.getFutureReplicaOffsetLag())); + if (logDirInfo.error() instanceof ClusterAuthorizationException) + handleFailure(logDirInfo.error()); + + if (logDirInfo.error() == null) { + for (Map.Entry<TopicPartition, ReplicaInfo> replicaInfoEntry : logDirInfo.replicaInfos().entrySet()) { + TopicPartition tp = replicaInfoEntry.getKey(); + ReplicaInfo replicaInfo = replicaInfoEntry.getValue(); + ReplicaLogDirInfo replicaLogDirInfo = replicaDirInfoByPartition.get(tp); + if (replicaLogDirInfo == null) { + log.warn("Server response from broker {} mentioned unknown partition {}", brokerId, tp); + } else if (replicaInfo.isFuture()) { + replicaDirInfoByPartition.put(tp, new ReplicaLogDirInfo(replicaLogDirInfo.getCurrentReplicaLogDir(), + replicaLogDirInfo.getCurrentReplicaOffsetLag(), + logDir, + replicaInfo.offsetLag())); + } else { + replicaDirInfoByPartition.put(tp, new ReplicaLogDirInfo(logDir, + replicaInfo.offsetLag(), + replicaLogDirInfo.getFutureReplicaLogDir(), + replicaLogDirInfo.getFutureReplicaOffsetLag())); + } + pendingPartitions.remove(tp); } + } else { + directoryFailures.put(logDir, logDirInfo.error()); + } + } + + if (!pendingPartitions.isEmpty() && !directoryFailures.isEmpty()) { + ArrayList<String> errorAtDir = new ArrayList<>(); Review Comment: nit ```suggestion List<String> errorAtDir = new ArrayList<>(); ``` ########## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ########## @@ -3108,34 +3109,54 @@ public DescribeLogDirsRequest.Builder createRequest(int timeoutMs) { @Override public void handleResponse(AbstractResponse abstractResponse) { DescribeLogDirsResponse response = (DescribeLogDirsResponse) abstractResponse; + + Set<TopicPartition> pendingPartitions = new HashSet<>(replicaDirInfoByPartition.keySet()); + Map<String, Throwable> directoryFailures = new HashMap<>(); + for (Map.Entry<String, LogDirDescription> responseEntry : logDirDescriptions(response).entrySet()) { String logDir = responseEntry.getKey(); LogDirDescription logDirInfo = responseEntry.getValue(); // No replica info will be provided if the log directory is offline if (logDirInfo.error() instanceof KafkaStorageException) continue; - if (logDirInfo.error() != null) - handleFailure(new IllegalStateException( - "The error " + logDirInfo.error().getClass().getName() + " for log directory " + logDir + " in the response from broker " + brokerId + " is illegal")); - - for (Map.Entry<TopicPartition, ReplicaInfo> replicaInfoEntry : logDirInfo.replicaInfos().entrySet()) { - TopicPartition tp = replicaInfoEntry.getKey(); - ReplicaInfo replicaInfo = replicaInfoEntry.getValue(); - ReplicaLogDirInfo replicaLogDirInfo = replicaDirInfoByPartition.get(tp); - if (replicaLogDirInfo == null) { - log.warn("Server response from broker {} mentioned unknown partition {}", brokerId, tp); - } else if (replicaInfo.isFuture()) { - replicaDirInfoByPartition.put(tp, new ReplicaLogDirInfo(replicaLogDirInfo.getCurrentReplicaLogDir(), - replicaLogDirInfo.getCurrentReplicaOffsetLag(), - logDir, - replicaInfo.offsetLag())); - } else { - replicaDirInfoByPartition.put(tp, new ReplicaLogDirInfo(logDir, - replicaInfo.offsetLag(), - replicaLogDirInfo.getFutureReplicaLogDir(), - replicaLogDirInfo.getFutureReplicaOffsetLag())); + if (logDirInfo.error() instanceof ClusterAuthorizationException) + handleFailure(logDirInfo.error()); + + if (logDirInfo.error() == null) { + for (Map.Entry<TopicPartition, ReplicaInfo> replicaInfoEntry : logDirInfo.replicaInfos().entrySet()) { + TopicPartition tp = replicaInfoEntry.getKey(); + ReplicaInfo replicaInfo = replicaInfoEntry.getValue(); + ReplicaLogDirInfo replicaLogDirInfo = replicaDirInfoByPartition.get(tp); + if (replicaLogDirInfo == null) { + log.warn("Server response from broker {} mentioned unknown partition {}", brokerId, tp); + } else if (replicaInfo.isFuture()) { + replicaDirInfoByPartition.put(tp, new ReplicaLogDirInfo(replicaLogDirInfo.getCurrentReplicaLogDir(), + replicaLogDirInfo.getCurrentReplicaOffsetLag(), + logDir, + replicaInfo.offsetLag())); + } else { + replicaDirInfoByPartition.put(tp, new ReplicaLogDirInfo(logDir, + replicaInfo.offsetLag(), + replicaLogDirInfo.getFutureReplicaLogDir(), + replicaLogDirInfo.getFutureReplicaOffsetLag())); + } + pendingPartitions.remove(tp); } + } else { + directoryFailures.put(logDir, logDirInfo.error()); + } + } + + if (!pendingPartitions.isEmpty() && !directoryFailures.isEmpty()) { + ArrayList<String> errorAtDir = new ArrayList<>(); + for (Map.Entry<String, Throwable> entry : directoryFailures.entrySet()) { + errorAtDir.add(entry.getValue().getClass().getName() + " at " + entry.getKey()); + } Review Comment: nit: ```suggestion directoryFailures.forEach((k, v) -> errorAtDir.add(v.getClass().getName() + " at " + k)); ``` ########## clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java: ########## @@ -2517,6 +2517,55 @@ public void testDescribeReplicaLogDirsUnexpected() throws ExecutionException, In } } + @Test + public void testDescribeReplicaLogDirsWithAuthorizationException() throws ExecutionException, InterruptedException { + TopicPartitionReplica tpr = new TopicPartitionReplica("topic", 12, 1); + + try (AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + String broker1log0 = "/var/data/kafka0"; + env.kafkaClient().prepareResponseFrom( + prepareDescribeLogDirsResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, broker1log0), + env.cluster().nodeById(tpr.brokerId())); + + DescribeReplicaLogDirsResult result = env.adminClient().describeReplicaLogDirs(singletonList(tpr)); + Map<TopicPartitionReplica, KafkaFuture<DescribeReplicaLogDirsResult.ReplicaLogDirInfo>> values = result.values(); + + Throwable e = assertThrows(Exception.class, () -> values.get(tpr).get()); + assertInstanceOf(ClusterAuthorizationException.class, e.getCause()); Review Comment: Please use `TestUtils#assertFutureThrows` instead. ########## clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java: ########## @@ -2517,6 +2517,55 @@ public void testDescribeReplicaLogDirsUnexpected() throws ExecutionException, In } } + @Test + public void testDescribeReplicaLogDirsWithAuthorizationException() throws ExecutionException, InterruptedException { + TopicPartitionReplica tpr = new TopicPartitionReplica("topic", 12, 1); + + try (AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + String broker1log0 = "/var/data/kafka0"; + env.kafkaClient().prepareResponseFrom( + prepareDescribeLogDirsResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, broker1log0), + env.cluster().nodeById(tpr.brokerId())); + + DescribeReplicaLogDirsResult result = env.adminClient().describeReplicaLogDirs(singletonList(tpr)); + Map<TopicPartitionReplica, KafkaFuture<DescribeReplicaLogDirsResult.ReplicaLogDirInfo>> values = result.values(); + + Throwable e = assertThrows(Exception.class, () -> values.get(tpr).get()); + assertInstanceOf(ClusterAuthorizationException.class, e.getCause()); + } + + } + + @Test + public void testDescribeReplicaLogDirsWithSingleDirException() throws ExecutionException, InterruptedException { + int brokerId = 1; + TopicPartitionReplica successfulTpr = new TopicPartitionReplica("topic", 12, brokerId); + TopicPartitionReplica failedTpr = new TopicPartitionReplica("failed", 12, brokerId); + + try (AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + String broker1log0 = "/var/data/kafka0"; + String broker1log1 = "/var/data/kafka1"; + int broker1Log0PartitionSize = 987654321; + int broker1Log0OffsetLag = 24; + + DescribeLogDirsResponseData.DescribeLogDirsResult successfulResult = prepareDescribeLogDirsResult( + successfulTpr, broker1log0, broker1Log0PartitionSize, broker1Log0OffsetLag, false); + DescribeLogDirsResponseData.DescribeLogDirsResult failedResult = new DescribeLogDirsResponseData.DescribeLogDirsResult() + .setErrorCode(Errors.LOG_DIR_NOT_FOUND.code()) + .setLogDir(broker1log1); + DescribeLogDirsResponse response = new DescribeLogDirsResponse(new DescribeLogDirsResponseData().setResults(asList(successfulResult, failedResult))); + env.kafkaClient().prepareResponseFrom(response, env.cluster().nodeById(successfulTpr.brokerId())); + + DescribeReplicaLogDirsResult result = env.adminClient().describeReplicaLogDirs(asList(successfulTpr, failedTpr)); Review Comment: List.of ########## clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java: ########## @@ -2517,6 +2517,55 @@ public void testDescribeReplicaLogDirsUnexpected() throws ExecutionException, In } } + @Test + public void testDescribeReplicaLogDirsWithAuthorizationException() throws ExecutionException, InterruptedException { + TopicPartitionReplica tpr = new TopicPartitionReplica("topic", 12, 1); + + try (AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + String broker1log0 = "/var/data/kafka0"; + env.kafkaClient().prepareResponseFrom( + prepareDescribeLogDirsResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, broker1log0), + env.cluster().nodeById(tpr.brokerId())); + + DescribeReplicaLogDirsResult result = env.adminClient().describeReplicaLogDirs(singletonList(tpr)); + Map<TopicPartitionReplica, KafkaFuture<DescribeReplicaLogDirsResult.ReplicaLogDirInfo>> values = result.values(); + + Throwable e = assertThrows(Exception.class, () -> values.get(tpr).get()); + assertInstanceOf(ClusterAuthorizationException.class, e.getCause()); + } + + } + + @Test + public void testDescribeReplicaLogDirsWithSingleDirException() throws ExecutionException, InterruptedException { + int brokerId = 1; + TopicPartitionReplica successfulTpr = new TopicPartitionReplica("topic", 12, brokerId); + TopicPartitionReplica failedTpr = new TopicPartitionReplica("failed", 12, brokerId); + + try (AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + String broker1log0 = "/var/data/kafka0"; + String broker1log1 = "/var/data/kafka1"; + int broker1Log0PartitionSize = 987654321; + int broker1Log0OffsetLag = 24; + + DescribeLogDirsResponseData.DescribeLogDirsResult successfulResult = prepareDescribeLogDirsResult( + successfulTpr, broker1log0, broker1Log0PartitionSize, broker1Log0OffsetLag, false); + DescribeLogDirsResponseData.DescribeLogDirsResult failedResult = new DescribeLogDirsResponseData.DescribeLogDirsResult() + .setErrorCode(Errors.LOG_DIR_NOT_FOUND.code()) + .setLogDir(broker1log1); + DescribeLogDirsResponse response = new DescribeLogDirsResponse(new DescribeLogDirsResponseData().setResults(asList(successfulResult, failedResult))); + env.kafkaClient().prepareResponseFrom(response, env.cluster().nodeById(successfulTpr.brokerId())); + + DescribeReplicaLogDirsResult result = env.adminClient().describeReplicaLogDirs(asList(successfulTpr, failedTpr)); + Map<TopicPartitionReplica, KafkaFuture<DescribeReplicaLogDirsResult.ReplicaLogDirInfo>> values = result.values(); + + assertNotNull(values.get(successfulTpr).get()); + assertThrows(Exception.class, () -> values.get(failedTpr).get()); Review Comment: Do you think we should assert the class of this exception? ########## clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java: ########## @@ -2517,6 +2517,55 @@ public void testDescribeReplicaLogDirsUnexpected() throws ExecutionException, In } } + @Test + public void testDescribeReplicaLogDirsWithAuthorizationException() throws ExecutionException, InterruptedException { + TopicPartitionReplica tpr = new TopicPartitionReplica("topic", 12, 1); + + try (AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + String broker1log0 = "/var/data/kafka0"; + env.kafkaClient().prepareResponseFrom( + prepareDescribeLogDirsResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, broker1log0), + env.cluster().nodeById(tpr.brokerId())); + + DescribeReplicaLogDirsResult result = env.adminClient().describeReplicaLogDirs(singletonList(tpr)); Review Comment: List.of ########## clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java: ########## @@ -2517,6 +2517,55 @@ public void testDescribeReplicaLogDirsUnexpected() throws ExecutionException, In } } + @Test + public void testDescribeReplicaLogDirsWithAuthorizationException() throws ExecutionException, InterruptedException { + TopicPartitionReplica tpr = new TopicPartitionReplica("topic", 12, 1); + + try (AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + String broker1log0 = "/var/data/kafka0"; + env.kafkaClient().prepareResponseFrom( + prepareDescribeLogDirsResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, broker1log0), + env.cluster().nodeById(tpr.brokerId())); + + DescribeReplicaLogDirsResult result = env.adminClient().describeReplicaLogDirs(singletonList(tpr)); + Map<TopicPartitionReplica, KafkaFuture<DescribeReplicaLogDirsResult.ReplicaLogDirInfo>> values = result.values(); + + Throwable e = assertThrows(Exception.class, () -> values.get(tpr).get()); + assertInstanceOf(ClusterAuthorizationException.class, e.getCause()); + } + + } + + @Test + public void testDescribeReplicaLogDirsWithSingleDirException() throws ExecutionException, InterruptedException { + int brokerId = 1; + TopicPartitionReplica successfulTpr = new TopicPartitionReplica("topic", 12, brokerId); + TopicPartitionReplica failedTpr = new TopicPartitionReplica("failed", 12, brokerId); + + try (AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + String broker1log0 = "/var/data/kafka0"; + String broker1log1 = "/var/data/kafka1"; + int broker1Log0PartitionSize = 987654321; + int broker1Log0OffsetLag = 24; + + DescribeLogDirsResponseData.DescribeLogDirsResult successfulResult = prepareDescribeLogDirsResult( + successfulTpr, broker1log0, broker1Log0PartitionSize, broker1Log0OffsetLag, false); + DescribeLogDirsResponseData.DescribeLogDirsResult failedResult = new DescribeLogDirsResponseData.DescribeLogDirsResult() + .setErrorCode(Errors.LOG_DIR_NOT_FOUND.code()) + .setLogDir(broker1log1); + DescribeLogDirsResponse response = new DescribeLogDirsResponse(new DescribeLogDirsResponseData().setResults(asList(successfulResult, failedResult))); Review Comment: List.of -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org