This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.8 by this push:
new 72e72e3537d KAFKA-16865: Add IncludeTopicAuthorizedOperations option
for DescribeTopicPartitionsRequest (#16136)
72e72e3537d is described below
commit 72e72e3537de863d90d437184cf3711887ac7e57
Author: Gantigmaa Selenge <[email protected]>
AuthorDate: Wed Jun 12 16:04:24 2024 +0100
KAFKA-16865: Add IncludeTopicAuthorizedOperations option for
DescribeTopicPartitionsRequest (#16136)
Reviewers: Mickael Maison <[email protected]>, Chia-Ping Tsai
<[email protected]>, Calvin Liu <[email protected]>, Andrew Schofield
<[email protected]>, Apoorv Mittal <[email protected]>
---
.../kafka/clients/admin/KafkaAdminClient.java | 8 ++-
.../kafka/clients/admin/KafkaAdminClientTest.java | 73 ++++++++++++++++++++++
2 files changed, 78 insertions(+), 3 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 5c8d9ebb799..55646e27820 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -2246,7 +2246,7 @@ public class KafkaAdminClient extends AdminClient {
continue;
}
- TopicDescription currentTopicDescription =
getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes);
+ TopicDescription currentTopicDescription =
getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes,
options.includeAuthorizedOperations());
if (partiallyFinishedTopicDescription != null &&
partiallyFinishedTopicDescription.name().equals(topicName)) {
// Add the partitions for the cursor topic of the
previous batch.
@@ -2409,14 +2409,16 @@ public class KafkaAdminClient extends AdminClient {
private TopicDescription
getTopicDescriptionFromDescribeTopicsResponseTopic(
DescribeTopicPartitionsResponseTopic topic,
- Map<Integer, Node> nodes
+ Map<Integer, Node> nodes,
+ boolean includeAuthorizedOperations
) {
List<DescribeTopicPartitionsResponsePartition> partitionInfos =
topic.partitions();
List<TopicPartitionInfo> partitions = new
ArrayList<>(partitionInfos.size());
for (DescribeTopicPartitionsResponsePartition partitionInfo :
partitionInfos) {
partitions.add(DescribeTopicPartitionsResponse.partitionToTopicPartitionInfo(partitionInfo,
nodes));
}
- return new TopicDescription(topic.name(), topic.isInternal(),
partitions, validAclOperations(topic.topicAuthorizedOperations()),
topic.topicId());
+ Set<AclOperation> authorisedOperations = includeAuthorizedOperations ?
validAclOperations(topic.topicAuthorizedOperations()) : null;
+ return new TopicDescription(topic.name(), topic.isInternal(),
partitions, authorisedOperations, topic.topicId());
}
private TopicDescription getTopicDescriptionFromCluster(Cluster cluster,
String topicName, Uuid topicId,
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index ea1305e533b..e04de635f71 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -1464,6 +1464,7 @@ public class KafkaAdminClientTest {
assertEquals(0,
topicDescription.partitions().get(0).partition());
assertEquals(1,
topicDescription.partitions().get(1).partition());
topicDescription = topicDescriptions.get(topicName1);
+ assertNull(topicDescription.authorizedOperations());
assertEquals(1, topicDescription.partitions().size());
} catch (Exception e) {
fail("describe using DescribeTopics API should not fail", e);
@@ -1471,6 +1472,77 @@ public class KafkaAdminClientTest {
}
}
+ @Test
+ public void testDescribeTopicPartitionsApiWithAuthorizedOps() throws
ExecutionException, InterruptedException {
+ try (AdminClientUnitTestEnv env = mockClientEnv()) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+ String topicName0 = "test-0";
+ Uuid topicId = Uuid.randomUuid();
+
+ int authorisedOperations =
Utils.to32BitField(Utils.mkSet(AclOperation.DESCRIBE.code(),
AclOperation.ALTER.code()));
+ env.kafkaClient().prepareResponse(
+ prepareDescribeClusterResponse(0,
+ env.cluster().nodes(),
+ env.cluster().clusterResource().clusterId(),
+ 2,
+ authorisedOperations)
+ );
+
+ DescribeTopicPartitionsResponseData responseData = new
DescribeTopicPartitionsResponseData();
+ responseData.topics().add(new
DescribeTopicPartitionsResponseTopic()
+ .setErrorCode((short) 0)
+ .setTopicId(topicId)
+ .setName(topicName0)
+ .setIsInternal(false)
+ .setTopicAuthorizedOperations(authorisedOperations));
+ env.kafkaClient().prepareResponse(new
DescribeTopicPartitionsResponse(responseData));
+
+ DescribeTopicsResult result = env.adminClient().describeTopics(
+ singletonList(topicName0), new
DescribeTopicsOptions().includeAuthorizedOperations(true)
+ );
+
+ Map<String, TopicDescription> topicDescriptions =
result.allTopicNames().get();
+ TopicDescription topicDescription =
topicDescriptions.get(topicName0);
+ assertEquals(new HashSet<>(asList(AclOperation.DESCRIBE,
AclOperation.ALTER)),
+ topicDescription.authorizedOperations());
+ }
+ }
+
+ @Test
+ public void testDescribeTopicPartitionsApiWithoutAuthorizedOps() throws
ExecutionException, InterruptedException {
+ try (AdminClientUnitTestEnv env = mockClientEnv()) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+ String topicName0 = "test-0";
+ Uuid topicId = Uuid.randomUuid();
+
+ int authorisedOperations =
Utils.to32BitField(Utils.mkSet(AclOperation.DESCRIBE.code(),
AclOperation.ALTER.code()));
+ env.kafkaClient().prepareResponse(
+ prepareDescribeClusterResponse(0,
+ env.cluster().nodes(),
+ env.cluster().clusterResource().clusterId(),
+ 2,
+ authorisedOperations)
+ );
+
+ DescribeTopicPartitionsResponseData responseData = new
DescribeTopicPartitionsResponseData();
+ responseData.topics().add(new
DescribeTopicPartitionsResponseTopic()
+ .setErrorCode((short) 0)
+ .setTopicId(topicId)
+ .setName(topicName0)
+ .setIsInternal(false)
+ .setTopicAuthorizedOperations(authorisedOperations));
+ env.kafkaClient().prepareResponse(new
DescribeTopicPartitionsResponse(responseData));
+
+ DescribeTopicsResult result = env.adminClient().describeTopics(
+ singletonList(topicName0), new
DescribeTopicsOptions().includeAuthorizedOperations(false)
+ );
+
+ Map<String, TopicDescription> topicDescriptions =
result.allTopicNames().get();
+ TopicDescription topicDescription =
topicDescriptions.get(topicName0);
+ assertNull(topicDescription.authorizedOperations());
+ }
+ }
+
@SuppressWarnings({"NPathComplexity", "CyclomaticComplexity"})
@Test
public void testDescribeTopicsWithDescribeTopicPartitionsApiEdgeCase() {
@@ -1547,6 +1619,7 @@ public class KafkaAdminClientTest {
assertEquals(2, topicDescription.partitions().size());
topicDescription = topicDescriptions.get(topicName2);
assertEquals(2, topicDescription.partitions().size());
+ assertNull(topicDescription.authorizedOperations());
} catch (Exception e) {
fail("describe using DescribeTopics API should not fail", e);
}