This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1d22b0d KAFKA-10774; Admin API for Describe topic using topic IDs
(#9769)
1d22b0d is described below
commit 1d22b0d70686aef5689b775ea2ea7610a37f3e8c
Author: dengziming <[email protected]>
AuthorDate: Sat Aug 28 16:00:36 2021 +0800
KAFKA-10774; Admin API for Describe topic using topic IDs (#9769)
Reviewers: Justine Olshan <[email protected]>, Chia-Ping Tsai
<[email protected]>, Satish Duggana <[email protected]>, Rajini Sivaram
<[email protected]>
---
.../java/org/apache/kafka/clients/admin/Admin.java | 44 +++++--
.../kafka/clients/admin/DeleteTopicsResult.java | 2 +-
.../kafka/clients/admin/DescribeTopicsResult.java | 97 +++++++++++++--
.../kafka/clients/admin/KafkaAdminClient.java | 127 ++++++++++++++++----
.../kafka/clients/admin/TopicDescription.java | 12 +-
.../apache/kafka/clients/admin/TopicListing.java | 28 ++++-
.../main/java/org/apache/kafka/common/Cluster.java | 8 ++
.../kafka/common/requests/MetadataRequest.java | 40 ++++++-
.../kafka/common/requests/MetadataResponse.java | 19 +++
.../resources/common/message/MetadataRequest.json | 3 +-
.../resources/common/message/MetadataResponse.json | 5 +-
.../kafka/clients/admin/AdminClientTestUtils.java | 7 +-
.../kafka/clients/admin/KafkaAdminClientTest.java | 4 +-
.../kafka/clients/admin/MockAdminClient.java | 100 +++++++++++++---
.../ClientAuthenticationFailureTest.java | 2 +-
.../connect/mirror/MirrorSourceConnector.java | 2 +-
.../org/apache/kafka/connect/util/TopicAdmin.java | 2 +-
.../apache/kafka/connect/util/TopicAdminTest.java | 2 +-
.../util/clusters/EmbeddedKafkaCluster.java | 2 +-
.../scala/kafka/admin/ConsumerGroupCommand.scala | 4 +-
.../kafka/admin/ReassignPartitionsCommand.scala | 4 +-
core/src/main/scala/kafka/admin/TopicCommand.scala | 133 +++++++++++++++------
core/src/main/scala/kafka/server/KafkaApis.scala | 36 +++++-
.../main/scala/kafka/server/MetadataCache.scala | 4 +
.../kafka/server/metadata/KRaftMetadataCache.scala | 4 +
.../kafka/server/metadata/ZkMetadataCache.scala | 8 ++
.../kafka/tools/ReplicaVerificationTool.scala | 2 +-
.../kafka/api/BaseAdminIntegrationTest.scala | 4 +-
.../api/DescribeAuthorizedOperationsTest.scala | 6 +-
.../kafka/api/EndToEndAuthorizationTest.scala | 8 +-
.../kafka/api/PlaintextAdminIntegrationTest.scala | 24 +++-
.../SaslClientsWithInvalidCredentialsTest.scala | 2 +-
.../server/DynamicBrokerReconfigurationTest.scala | 2 +-
.../kafka/server/KRaftClusterTest.scala | 2 +-
.../MetadataRequestBetweenDifferentIbpTest.scala | 96 +++++++++++++++
.../scala/unit/kafka/admin/DeleteTopicTest.scala | 2 +-
.../kafka/admin/TopicCommandIntegrationTest.scala | 18 +--
.../scala/unit/kafka/server/KafkaApisTest.scala | 111 +++++++++++++++++
.../test/scala/unit/kafka/utils/TestUtils.scala | 8 +-
.../TopicBasedRemoteLogMetadataManager.java | 2 +-
.../processor/internals/InternalTopicManager.java | 4 +-
.../KStreamRepartitionIntegrationTest.java | 2 +-
...bleJoinTopologyOptimizationIntegrationTest.java | 2 +-
.../internals/InternalTopicManagerTest.java | 8 +-
.../kafka/tools/ClientCompatibilityTest.java | 2 +-
.../apache/kafka/tools/TransactionsCommand.java | 2 +-
.../kafka/tools/TransactionsCommandTest.java | 2 +-
.../apache/kafka/trogdor/common/WorkerUtils.java | 4 +-
.../kafka/trogdor/common/WorkerUtilsTest.java | 6 +-
49 files changed, 846 insertions(+), 172 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index 54d103b..4b6fe49 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -17,14 +17,6 @@
package org.apache.kafka.clients.admin;
-import java.time.Duration;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.Set;
-
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.KafkaFuture;
@@ -42,6 +34,14 @@ import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaFilter;
import org.apache.kafka.common.requests.LeaveGroupResponse;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+
/**
* The administrative client for Kafka, which supports managing and inspecting
topics, brokers, configurations and ACLs.
* <p>
@@ -303,7 +303,33 @@ public interface Admin extends AutoCloseable {
* @param options The options to use when describing the topic.
* @return The DescribeTopicsResult.
*/
- DescribeTopicsResult describeTopics(Collection<String> topicNames,
DescribeTopicsOptions options);
+ default DescribeTopicsResult describeTopics(Collection<String> topicNames,
DescribeTopicsOptions options) {
+ return describeTopics(TopicCollection.ofTopicNames(topicNames),
options);
+ }
+
+ /**
+ * This is a convenience method for {@link
#describeTopics(TopicCollection, DescribeTopicsOptions)}
+ * with default options. See the overload for more details.
+ * <p>
+ * When using topic IDs, this operation is supported by brokers with
version 3.1.0 or higher.
+ *
+ * @param topics The topics to describe.
+ * @return The DescribeTopicsResult.
+ */
+ default DescribeTopicsResult describeTopics(TopicCollection topics) {
+ return describeTopics(topics, new DescribeTopicsOptions());
+ }
+
+ /**
+ * Describe some topics in the cluster.
+ *
+ * When using topic IDs, this operation is supported by brokers with
version 3.1.0 or higher.
+ *
+ * @param topics The topics to describe.
+ * @param options The options to use when describing the topics.
+ * @return The DescribeTopicsResult.
+ */
+ DescribeTopicsResult describeTopics(TopicCollection topics,
DescribeTopicsOptions options);
/**
* Get information about the nodes in the cluster, using the default
options.
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java
b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java
index b0d23d9..725b82a 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java
@@ -70,12 +70,12 @@ public class DeleteTopicsResult {
return nameFutures;
}
- @Deprecated
/**
* @return a map from topic names to futures which can be used to check
the status of
* individual deletions if the deleteTopics request used topic names.
Otherwise return null.
* @deprecated Since 3.0 use {@link #topicNameValues} instead
*/
+ @Deprecated
public Map<String, KafkaFuture<Void>> values() {
return nameFutures;
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
index 7753984..41593c5 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
@@ -18,6 +18,8 @@
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicCollection;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Collection;
@@ -32,28 +34,105 @@ import java.util.concurrent.ExecutionException;
*/
@InterfaceStability.Evolving
public class DescribeTopicsResult {
- private final Map<String, KafkaFuture<TopicDescription>> futures;
+ private final Map<Uuid, KafkaFuture<TopicDescription>> topicIdFutures;
+ private final Map<String, KafkaFuture<TopicDescription>> nameFutures;
+ @Deprecated
protected DescribeTopicsResult(Map<String, KafkaFuture<TopicDescription>>
futures) {
- this.futures = futures;
+ this(null, futures);
+ }
+
+ // VisibleForTesting
+ protected DescribeTopicsResult(Map<Uuid, KafkaFuture<TopicDescription>>
topicIdFutures, Map<String, KafkaFuture<TopicDescription>> nameFutures) {
+ if (topicIdFutures != null && nameFutures != null)
+ throw new IllegalArgumentException("topicIdFutures and nameFutures
cannot both be specified.");
+ if (topicIdFutures == null && nameFutures == null)
+ throw new IllegalArgumentException("topicIdFutures and nameFutures
cannot both be null.");
+ this.topicIdFutures = topicIdFutures;
+ this.nameFutures = nameFutures;
+ }
+
+ static DescribeTopicsResult ofTopicIds(Map<Uuid,
KafkaFuture<TopicDescription>> topicIdFutures) {
+ return new DescribeTopicsResult(topicIdFutures, null);
+ }
+
+ static DescribeTopicsResult ofTopicNames(Map<String,
KafkaFuture<TopicDescription>> nameFutures) {
+ return new DescribeTopicsResult(null, nameFutures);
+ }
+
+ /**
+ * Use when {@link Admin#describeTopics(TopicCollection,
DescribeTopicsOptions)} used a TopicIdCollection
+ *
+ * @return a map from topic IDs to futures which can be used to check the
status of
+ * individual topics if the request used topic IDs, otherwise
return null.
+ */
+ public Map<Uuid, KafkaFuture<TopicDescription>> topicIdValues() {
+ return topicIdFutures;
+ }
+
+ /**
+ * Use when {@link Admin#describeTopics(TopicCollection,
DescribeTopicsOptions)} used a TopicNameCollection
+ *
+ * @return a map from topic names to futures which can be used to check
the status of
+ * individual topics if the request used topic names, otherwise
return null.
+ */
+ public Map<String, KafkaFuture<TopicDescription>> topicNameValues() {
+ return nameFutures;
}
/**
- * Return a map from topic names to futures which can be used to check the
status of
- * individual topics.
+ * @return a map from topic names to futures which can be used to check
the status of
+ * individual topics if the request used topic names, otherwise
return null.
+ *
+ * @deprecated Since 3.1.0 use {@link #topicNameValues} instead
*/
+ @Deprecated
public Map<String, KafkaFuture<TopicDescription>> values() {
- return futures;
+ return nameFutures;
}
/**
- * Return a future which succeeds only if all the topic descriptions
succeed.
+ * @return A future map from topic names to descriptions which can be used
to check
+ * the status of individual description if the describe topic
request used
+ * topic names, otherwise return null, this request succeeds only
if all the
+ * topic descriptions succeed
+ *
+ * @deprecated Since 3.1.0 use {@link #allTopicNames()} instead
*/
+ @Deprecated
public KafkaFuture<Map<String, TopicDescription>> all() {
- return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
+ return all(nameFutures);
+ }
+
+ /**
+ * @return A future map from topic names to descriptions which can be used
to check
+ * the status of individual description if the describe topic
request used
+ * topic names, otherwise return null, this request succeeds only
if all the
+ * topic descriptions succeed
+ */
+ public KafkaFuture<Map<String, TopicDescription>> allTopicNames() {
+ return all(nameFutures);
+ }
+
+ /**
+ * @return A future map from topic ids to descriptions which can be used
to check the
+ * status of individual description if the describe topic request
used topic
+ * ids, otherwise return null, this request succeeds only if all
the topic
+ * descriptions succeed
+ */
+ public KafkaFuture<Map<Uuid, TopicDescription>> allTopicIds() {
+ return all(topicIdFutures);
+ }
+
+ /**
+ * Return a future which succeeds only if all the topic descriptions
succeed.
+ */
+ private static <T> KafkaFuture<Map<T, TopicDescription>> all(Map<T,
KafkaFuture<TopicDescription>> futures) {
+ KafkaFuture<Void> future =
KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
+ return future.
thenApply(v -> {
- Map<String, TopicDescription> descriptions = new
HashMap<>(futures.size());
- for (Map.Entry<String, KafkaFuture<TopicDescription>> entry :
futures.entrySet()) {
+ Map<T, TopicDescription> descriptions = new
HashMap<>(futures.size());
+ for (Map.Entry<T, KafkaFuture<TopicDescription>> entry :
futures.entrySet()) {
try {
descriptions.put(entry.getKey(),
entry.getValue().get());
} catch (InterruptedException | ExecutionException e) {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 981598f..4fae1cc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -271,6 +271,7 @@ import static
org.apache.kafka.common.message.ListPartitionReassignmentsRequestD
import static
org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingPartitionReassignment;
import static
org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment;
import static
org.apache.kafka.common.requests.MetadataRequest.convertToMetadataRequestTopic;
+import static
org.apache.kafka.common.requests.MetadataRequest.convertTopicIdsToMetadataRequestTopic;
import static org.apache.kafka.common.utils.Utils.closeQuietly;
/**
@@ -1497,6 +1498,10 @@ public class KafkaAdminClient extends AdminClient {
return topicName == null || topicName.isEmpty();
}
+ private static boolean topicIdIsUnrepresentable(Uuid topicId) {
+ return topicId == null || topicId == Uuid.ZERO_UUID;
+ }
+
// for testing
int numPendingCalls() {
return runnable.pendingCalls.size();
@@ -1884,7 +1889,7 @@ public class KafkaAdminClient extends AdminClient {
String topicName = topicMetadata.topic();
boolean isInternal = topicMetadata.isInternal();
if (!topicMetadata.isInternal() ||
options.shouldListInternal())
- topicListing.put(topicName, new
TopicListing(topicName, isInternal));
+ topicListing.put(topicName, new
TopicListing(topicName, topicMetadata.topicId(), isInternal));
}
topicListingFuture.complete(topicListing);
}
@@ -1898,7 +1903,16 @@ public class KafkaAdminClient extends AdminClient {
}
@Override
- public DescribeTopicsResult describeTopics(final Collection<String>
topicNames, DescribeTopicsOptions options) {
+ public DescribeTopicsResult describeTopics(final TopicCollection topics,
DescribeTopicsOptions options) {
+ if (topics instanceof TopicIdCollection)
+ return
DescribeTopicsResult.ofTopicIds(handleDescribeTopicsByIds(((TopicIdCollection)
topics).topicIds(), options));
+ else if (topics instanceof TopicNameCollection)
+ return
DescribeTopicsResult.ofTopicNames(handleDescribeTopicsByNames(((TopicNameCollection)
topics).topicNames(), options));
+ else
+ throw new IllegalArgumentException("The TopicCollection: " +
topics + " provided did not match any supported classes for describeTopics.");
+ }
+
+ private Map<String, KafkaFuture<TopicDescription>>
handleDescribeTopicsByNames(final Collection<String> topicNames,
DescribeTopicsOptions options) {
final Map<String, KafkaFutureImpl<TopicDescription>> topicFutures =
new HashMap<>(topicNames.size());
final ArrayList<String> topicNamesList = new ArrayList<>();
for (String topicName : topicNames) {
@@ -1947,28 +1961,13 @@ public class KafkaAdminClient extends AdminClient {
future.completeExceptionally(new
UnknownTopicOrPartitionException("Topic " + topicName + " not found."));
continue;
}
- boolean isInternal =
cluster.internalTopics().contains(topicName);
- List<PartitionInfo> partitionInfos =
cluster.partitionsForTopic(topicName);
- List<TopicPartitionInfo> partitions = new
ArrayList<>(partitionInfos.size());
- for (PartitionInfo partitionInfo : partitionInfos) {
- TopicPartitionInfo topicPartitionInfo = new
TopicPartitionInfo(
- partitionInfo.partition(), leader(partitionInfo),
Arrays.asList(partitionInfo.replicas()),
- Arrays.asList(partitionInfo.inSyncReplicas()));
- partitions.add(topicPartitionInfo);
- }
-
partitions.sort(Comparator.comparingInt(TopicPartitionInfo::partition));
- TopicDescription topicDescription = new
TopicDescription(topicName, isInternal, partitions,
-
validAclOperations(response.topicAuthorizedOperations(topicName).get()),
cluster.topicId(topicName));
+ Uuid topicId = cluster.topicId(topicName);
+ Integer authorizedOperations =
response.topicAuthorizedOperations(topicName).get();
+ TopicDescription topicDescription =
getTopicDescriptionFromCluster(cluster, topicName, topicId,
authorizedOperations);
future.complete(topicDescription);
}
}
- private Node leader(PartitionInfo partitionInfo) {
- if (partitionInfo.leader() == null ||
partitionInfo.leader().id() == Node.noNode().id())
- return null;
- return partitionInfo.leader();
- }
-
@Override
boolean
handleUnsupportedVersionException(UnsupportedVersionException exception) {
if (supportsDisablingTopicCreation) {
@@ -1986,7 +1985,93 @@ public class KafkaAdminClient extends AdminClient {
if (!topicNamesList.isEmpty()) {
runnable.call(call, now);
}
- return new DescribeTopicsResult(new HashMap<>(topicFutures));
+ return new HashMap<>(topicFutures);
+ }
+
+ private Map<Uuid, KafkaFuture<TopicDescription>>
handleDescribeTopicsByIds(Collection<Uuid> topicIds, DescribeTopicsOptions
options) {
+
+ final Map<Uuid, KafkaFutureImpl<TopicDescription>> topicFutures = new
HashMap<>(topicIds.size());
+ final List<Uuid> topicIdsList = new ArrayList<>();
+ for (Uuid topicId : topicIds) {
+ if (topicIdIsUnrepresentable(topicId)) {
+ KafkaFutureImpl<TopicDescription> future = new
KafkaFutureImpl<>();
+ future.completeExceptionally(new InvalidTopicException("The
given topic id '" +
+ topicId + "' cannot be represented in a request."));
+ topicFutures.put(topicId, future);
+ } else if (!topicFutures.containsKey(topicId)) {
+ topicFutures.put(topicId, new KafkaFutureImpl<>());
+ topicIdsList.add(topicId);
+ }
+ }
+ final long now = time.milliseconds();
+ Call call = new Call("describeTopicsWithIds", calcDeadlineMs(now,
options.timeoutMs()),
+ new LeastLoadedNodeProvider()) {
+
+ @Override
+ MetadataRequest.Builder createRequest(int timeoutMs) {
+ return new MetadataRequest.Builder(new MetadataRequestData()
+
.setTopics(convertTopicIdsToMetadataRequestTopic(topicIdsList))
+ .setAllowAutoTopicCreation(false)
+
.setIncludeTopicAuthorizedOperations(options.includeAuthorizedOperations()));
+ }
+
+ @Override
+ void handleResponse(AbstractResponse abstractResponse) {
+ MetadataResponse response = (MetadataResponse)
abstractResponse;
+ // Handle server responses for particular topics.
+ Cluster cluster = response.buildCluster();
+ Map<Uuid, Errors> errors = response.errorsByTopicId();
+ for (Map.Entry<Uuid, KafkaFutureImpl<TopicDescription>> entry
: topicFutures.entrySet()) {
+ Uuid topicId = entry.getKey();
+ KafkaFutureImpl<TopicDescription> future =
entry.getValue();
+
+ String topicName = cluster.topicName(topicId);
+ if (topicName == null) {
+ future.completeExceptionally(new
InvalidTopicException("TopicId " + topicId + " not found."));
+ continue;
+ }
+ Errors topicError = errors.get(topicId);
+ if (topicError != null) {
+ future.completeExceptionally(topicError.exception());
+ continue;
+ }
+
+ Integer authorizedOperations =
response.topicAuthorizedOperations(topicName).get();
+ TopicDescription topicDescription =
getTopicDescriptionFromCluster(cluster, topicName, topicId,
authorizedOperations);
+ future.complete(topicDescription);
+ }
+ }
+
+ @Override
+ void handleFailure(Throwable throwable) {
+ completeAllExceptionally(topicFutures.values(), throwable);
+ }
+ };
+ if (!topicIdsList.isEmpty()) {
+ runnable.call(call, now);
+ }
+ return new HashMap<>(topicFutures);
+ }
+
+ private TopicDescription getTopicDescriptionFromCluster(Cluster cluster,
String topicName, Uuid topicId,
+ Integer
authorizedOperations) {
+ boolean isInternal = cluster.internalTopics().contains(topicName);
+ List<PartitionInfo> partitionInfos =
cluster.partitionsForTopic(topicName);
+ List<TopicPartitionInfo> partitions = new
ArrayList<>(partitionInfos.size());
+ for (PartitionInfo partitionInfo : partitionInfos) {
+ TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(
+ partitionInfo.partition(), leader(partitionInfo),
Arrays.asList(partitionInfo.replicas()),
+ Arrays.asList(partitionInfo.inSyncReplicas()));
+ partitions.add(topicPartitionInfo);
+ }
+
partitions.sort(Comparator.comparingInt(TopicPartitionInfo::partition));
+ return new TopicDescription(topicName, isInternal, partitions,
validAclOperations(authorizedOperations), topicId);
+ }
+
+ private Node leader(PartitionInfo partitionInfo) {
+ if (partitionInfo.leader() == null || partitionInfo.leader().id() ==
Node.noNode().id())
+ return null;
+ return partitionInfo.leader();
}
@Override
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
b/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
index a2d52e9..e8700d4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
@@ -72,13 +72,23 @@ public class TopicDescription {
* @param internal Whether the topic is internal to Kafka
* @param partitions A list of partitions where the index represents the
partition id and the element contains
* leadership and replica information for that partition.
- * @param authorizedOperations authorized operations for this topic, or
null if this is not known.
+ * @param authorizedOperations authorized operations for this topic, or
empty set if this is not known.
*/
public TopicDescription(String name, boolean internal,
List<TopicPartitionInfo> partitions,
Set<AclOperation> authorizedOperations) {
this(name, internal, partitions, authorizedOperations, Uuid.ZERO_UUID);
}
+ /**
+ * Create an instance with the specified parameters.
+ *
+ * @param name The topic name
+ * @param internal Whether the topic is internal to Kafka
+ * @param partitions A list of partitions where the index represents the
partition id and the element contains
+ * leadership and replica information for that partition.
+ * @param authorizedOperations authorized operations for this topic, or
empty set if this is not known.
+ * @param topicId the topic id
+ */
public TopicDescription(String name, boolean internal,
List<TopicPartitionInfo> partitions,
Set<AclOperation> authorizedOperations, Uuid
topicId) {
this.name = name;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/TopicListing.java
b/clients/src/main/java/org/apache/kafka/clients/admin/TopicListing.java
index e5124be..42ceeff 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/TopicListing.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/TopicListing.java
@@ -17,11 +17,14 @@
package org.apache.kafka.clients.admin;
+import org.apache.kafka.common.Uuid;
+
/**
* A listing of a topic in the cluster.
*/
public class TopicListing {
private final String name;
+ private final Uuid topicId;
private final boolean internal;
/**
@@ -29,10 +32,33 @@ public class TopicListing {
*
* @param name The topic name
* @param internal Whether the topic is internal to Kafka
+ * @deprecated Since 3.0 use {@link #TopicListing(String, Uuid, boolean)}
instead
*/
+ @Deprecated
public TopicListing(String name, boolean internal) {
this.name = name;
this.internal = internal;
+ this.topicId = Uuid.ZERO_UUID;
+ }
+
+ /**
+ * Create an instance with the specified parameters.
+ *
+ * @param name The topic name
+ * @param topicId The topic id.
+ * @param internal Whether the topic is internal to Kafka
+ */
+ public TopicListing(String name, Uuid topicId, boolean internal) {
+ this.topicId = topicId;
+ this.name = name;
+ this.internal = internal;
+ }
+
+ /**
+ * The id of the topic.
+ */
+ public Uuid topicId() {
+ return topicId;
}
/**
@@ -52,6 +78,6 @@ public class TopicListing {
@Override
public String toString() {
- return "(name=" + name + ", internal=" + internal + ")";
+ return "(name=" + name + ", topicId=" + topicId + ", internal=" +
internal + ")";
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java
b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index 8c9bbcb..7d3f6f0 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -47,6 +47,7 @@ public final class Cluster {
private final Map<Integer, Node> nodesById;
private final ClusterResource clusterResource;
private final Map<String, Uuid> topicIds;
+ private final Map<Uuid, String> topicNames;
/**
* Create a new cluster with the given id, nodes and partitions
@@ -184,6 +185,9 @@ public final class Cluster {
this.availablePartitionsByTopic =
Collections.unmodifiableMap(tmpAvailablePartitionsByTopic);
this.partitionsByNode =
Collections.unmodifiableMap(tmpPartitionsByNode);
this.topicIds = Collections.unmodifiableMap(topicIds);
+ Map<Uuid, String> tmpTopicNames = new HashMap<>();
+ topicIds.forEach((key, value) -> tmpTopicNames.put(value, key));
+ this.topicNames = Collections.unmodifiableMap(tmpTopicNames);
this.unauthorizedTopics =
Collections.unmodifiableSet(unauthorizedTopics);
this.invalidTopics = Collections.unmodifiableSet(invalidTopics);
@@ -354,6 +358,10 @@ public final class Cluster {
return topicIds.getOrDefault(topic, Uuid.ZERO_UUID);
}
+ public String topicName(Uuid topicId) {
+ return topicNames.get(topicId);
+ }
+
@Override
public String toString() {
return "Cluster(id = " + clusterResource.clusterId() + ", nodes = " +
this.nodes +
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
index d38e9ac..aab5fc6 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
@@ -65,6 +65,20 @@ public class MetadataRequest extends AbstractRequest {
this(topics, allowAutoTopicCreation,
ApiKeys.METADATA.oldestVersion(), ApiKeys.METADATA.latestVersion());
}
+ public Builder(List<Uuid> topicIds) {
+ super(ApiKeys.METADATA, ApiKeys.METADATA.oldestVersion(),
ApiKeys.METADATA.latestVersion());
+ MetadataRequestData data = new MetadataRequestData();
+ if (topicIds == null)
+ data.setTopics(null);
+ else {
+ topicIds.forEach(topicId -> data.topics().add(new
MetadataRequestTopic().setTopicId(topicId)));
+ }
+
+ // It's impossible to create topic with topicId
+ data.setAllowAutoTopicCreation(false);
+ this.data = data;
+ }
+
public static Builder allTopics() {
// This never causes auto-creation, but we set the boolean to true
because that is the default value when
// deserializing V2 and older. This way, the value is consistent
after serialization and deserialization.
@@ -95,10 +109,10 @@ public class MetadataRequest extends AbstractRequest {
"allowAutoTopicCreation field");
if (data.topics() != null) {
data.topics().forEach(topic -> {
- if (topic.name() == null)
+ if (topic.name() == null && version < 12)
throw new UnsupportedVersionException("MetadataRequest
version " + version +
" does not support null topic names.");
- if (topic.topicId() != Uuid.ZERO_UUID)
+ if (topic.topicId() != Uuid.ZERO_UUID && version < 12)
throw new UnsupportedVersionException("MetadataRequest
version " + version +
" does not support non-zero topic IDs.");
});
@@ -147,12 +161,12 @@ public class MetadataRequest extends AbstractRequest {
public boolean isAllTopics() {
return (data.topics() == null) ||
- (data.topics().isEmpty() && version() == 0); //In version 0, an
empty topic list indicates
+ (data.topics().isEmpty() && version() == 0); // In version 0, an
empty topic list indicates
// "request metadata
for all topics."
}
public List<String> topics() {
- if (isAllTopics()) //In version 0, we return null for empty topic list
+ if (isAllTopics()) // In version 0, we return null for empty topic list
return null;
else
return data.topics()
@@ -161,6 +175,18 @@ public class MetadataRequest extends AbstractRequest {
.collect(Collectors.toList());
}
+ public List<Uuid> topicIds() {
+ if (isAllTopics())
+ return Collections.emptyList();
+ else if (version() < 10)
+ return Collections.emptyList();
+ else
+ return data.topics()
+ .stream()
+ .map(MetadataRequestTopic::topicId)
+ .collect(Collectors.toList());
+ }
+
public boolean allowAutoTopicCreation() {
return data.allowAutoTopicCreation();
}
@@ -174,4 +200,10 @@ public class MetadataRequest extends AbstractRequest {
.setName(topic))
.collect(Collectors.toList());
}
+
+ public static List<MetadataRequestTopic>
convertTopicIdsToMetadataRequestTopic(final Collection<Uuid> topicIds) {
+ return topicIds.stream().map(topicId -> new MetadataRequestTopic()
+ .setTopicId(topicId))
+ .collect(Collectors.toList());
+ }
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index c85b31d..d539fa8 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -91,12 +91,31 @@ public class MetadataResponse extends AbstractResponse {
public Map<String, Errors> errors() {
Map<String, Errors> errors = new HashMap<>();
for (MetadataResponseTopic metadata : data.topics()) {
+ if (metadata.name() == null) {
+ throw new IllegalStateException("Use errorsByTopicId() when
managing topic using topic id");
+ }
if (metadata.errorCode() != Errors.NONE.code())
errors.put(metadata.name(),
Errors.forCode(metadata.errorCode()));
}
return errors;
}
+ /**
+ * Get a map of the topicIds which had metadata errors
+ * @return the map
+ */
+ public Map<Uuid, Errors> errorsByTopicId() {
+ Map<Uuid, Errors> errors = new HashMap<>();
+ for (MetadataResponseTopic metadata : data.topics()) {
+ if (metadata.topicId() == Uuid.ZERO_UUID) {
+ throw new IllegalStateException("Use errors() when managing
topic using topic name");
+ }
+ if (metadata.errorCode() != Errors.NONE.code())
+ errors.put(metadata.topicId(),
Errors.forCode(metadata.errorCode()));
+ }
+ return errors;
+ }
+
@Override
public Map<Errors, Integer> errorCounts() {
Map<Errors, Integer> errorCounts = new HashMap<>();
diff --git a/clients/src/main/resources/common/message/MetadataRequest.json
b/clients/src/main/resources/common/message/MetadataRequest.json
index 2d88a0d..5da95cf 100644
--- a/clients/src/main/resources/common/message/MetadataRequest.json
+++ b/clients/src/main/resources/common/message/MetadataRequest.json
@@ -18,7 +18,7 @@
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "MetadataRequest",
- "validVersions": "0-11",
+ "validVersions": "0-12",
"flexibleVersions": "9+",
"fields": [
// In version 0, an empty array indicates "request metadata for all
topics." In version 1 and
@@ -38,6 +38,7 @@
//
// Version 11 deprecates IncludeClusterAuthorizedOperations field. This is
now exposed
// by the DescribeCluster API (KIP-700).
+ // Version 12 supports topic Id.
{ "name": "Topics", "type": "[]MetadataRequestTopic", "versions": "0+",
"nullableVersions": "1+",
"about": "The topics to fetch metadata for.", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable":
true, "about": "The topic id." },
diff --git a/clients/src/main/resources/common/message/MetadataResponse.json
b/clients/src/main/resources/common/message/MetadataResponse.json
index d005294..714b28b 100644
--- a/clients/src/main/resources/common/message/MetadataResponse.json
+++ b/clients/src/main/resources/common/message/MetadataResponse.json
@@ -41,7 +41,8 @@
//
// Version 11 deprecates ClusterAuthorizedOperations. This is now exposed
// by the DescribeCluster API (KIP-700).
- "validVersions": "0-11",
+ // Version 12 supports topicId.
+ "validVersions": "0-12",
"flexibleVersions": "9+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "3+",
"ignorable": true,
@@ -65,7 +66,7 @@
"about": "Each topic in the response.", "fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The topic error, or 0 if there was no error." },
- { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
"entityType": "topicName",
+ { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
"entityType": "topicName", "nullableVersions": "12+",
"about": "The topic name." },
{ "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable":
true, "about": "The topic id." },
{ "name": "IsInternal", "type": "bool", "versions": "1+", "default":
"false", "ignorable": true,
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
index 019566a..3db5739 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
@@ -26,6 +26,7 @@ import
org.apache.kafka.clients.admin.internals.MetadataOperationContext;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.internals.KafkaFutureImpl;
public class AdminClientTestUtils {
@@ -70,7 +71,7 @@ public class AdminClientTestUtils {
*/
public static ListTopicsResult listTopicsResult(String topic) {
KafkaFutureImpl<Map<String, TopicListing>> future = new
KafkaFutureImpl<>();
- future.complete(Collections.singletonMap(topic, new
TopicListing(topic, false)));
+ future.complete(Collections.singletonMap(topic, new
TopicListing(topic, Uuid.ZERO_UUID, false)));
return new ListTopicsResult(future);
}
@@ -93,11 +94,11 @@ public class AdminClientTestUtils {
public static DescribeTopicsResult describeTopicsResult(String topic,
TopicDescription description) {
KafkaFutureImpl<TopicDescription> future = new KafkaFutureImpl<>();
future.complete(description);
- return new DescribeTopicsResult(Collections.singletonMap(topic,
future));
+ return
DescribeTopicsResult.ofTopicNames(Collections.singletonMap(topic, future));
}
public static DescribeTopicsResult describeTopicsResult(Map<String,
TopicDescription> topicDescriptions) {
- return new DescribeTopicsResult(topicDescriptions.entrySet().stream()
+ return
DescribeTopicsResult.ofTopicNames(topicDescriptions.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e ->
KafkaFuture.completedFuture(e.getValue()))));
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index f1ace85..46542db 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -1203,7 +1203,7 @@ public class KafkaAdminClientTest {
assertEquals(0, env.kafkaClient().inFlightRequestCount());
Map<String, KafkaFuture<TopicDescription>> describeFutures =
- env.adminClient().describeTopics(sillyTopicNames).values();
+
env.adminClient().describeTopics(sillyTopicNames).topicNameValues();
for (String sillyTopicName : sillyTopicNames) {
TestUtils.assertFutureError(describeFutures.get(sillyTopicName),
InvalidTopicException.class);
}
@@ -1255,7 +1255,7 @@ public class KafkaAdminClientTest {
singletonList(partitionMetadata),
MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED))));
DescribeTopicsResult result =
env.adminClient().describeTopics(singleton(topic));
- Map<String, TopicDescription> topicDescriptions =
result.all().get();
+ Map<String, TopicDescription> topicDescriptions =
result.allTopicNames().get();
assertEquals(leader,
topicDescriptions.get(topic).partitions().get(0).leader());
assertNull(topicDescriptions.get(topic).authorizedOperations());
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index 2ef4a26..473edae 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -40,6 +40,7 @@ import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.errors.ReplicaNotAvailableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.common.errors.UnknownTopicIdException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
@@ -199,10 +200,18 @@ public class MockAdminClient extends AdminClient {
this.controller = controller;
}
- synchronized public void addTopic(boolean internal,
+ public void addTopic(boolean internal,
String name,
List<TopicPartitionInfo> partitions,
Map<String, String> configs) {
+ addTopic(internal, name, partitions, configs, true);
+ }
+
+ synchronized public void addTopic(boolean internal,
+ String name,
+ List<TopicPartitionInfo> partitions,
+ Map<String, String> configs,
+ boolean usesTopicId) {
if (allTopics.containsKey(name)) {
throw new IllegalArgumentException(String.format("Topic %s was
already added.", name));
}
@@ -223,10 +232,15 @@ public class MockAdminClient extends AdminClient {
logDirs.add(brokerLogDirs.get(partition.leader().id()).get(0));
}
}
- allTopics.put(name, new TopicMetadata(internal, partitions, logDirs,
configs));
- Uuid id = Uuid.randomUuid();
- topicIds.put(name, id);
- topicNames.put(id, name);
+ Uuid topicId;
+ if (usesTopicId) {
+ topicId = Uuid.randomUuid();
+ topicIds.put(name, topicId);
+ topicNames.put(topicId, name);
+ } else {
+ topicId = Uuid.ZERO_UUID;
+ }
+ allTopics.put(name, new TopicMetadata(topicId, internal, partitions,
logDirs, configs));
}
synchronized public void markTopicForDeletion(final String name) {
@@ -317,10 +331,10 @@ public class MockAdminClient extends AdminClient {
partitions.add(new TopicPartitionInfo(i, brokers.get(0),
replicas, Collections.emptyList()));
logDirs.add(brokerLogDirs.get(partitions.get(i).leader().id()).get(0));
}
- allTopics.put(topicName, new TopicMetadata(false, partitions,
logDirs, newTopic.configs()));
- Uuid id = Uuid.randomUuid();
- topicIds.put(topicName, id);
- topicNames.put(id, topicName);
+ Uuid topicId = Uuid.randomUuid();
+ topicIds.put(topicName, topicId);
+ topicNames.put(topicId, topicName);
+ allTopics.put(topicName, new TopicMetadata(topicId, false,
partitions, logDirs, newTopic.configs()));
future.complete(null);
createTopicResult.put(topicName, future);
}
@@ -345,7 +359,7 @@ public class MockAdminClient extends AdminClient {
if (topicDescription.getValue().fetchesRemainingUntilVisible > 0) {
topicDescription.getValue().fetchesRemainingUntilVisible--;
} else {
- topicListings.put(topicName, new TopicListing(topicName,
topicDescription.getValue().isInternalTopic));
+ topicListings.put(topicName, new TopicListing(topicName,
topicDescription.getValue().topicId,
topicDescription.getValue().isInternalTopic));
}
}
@@ -355,7 +369,16 @@ public class MockAdminClient extends AdminClient {
}
@Override
- synchronized public DescribeTopicsResult describeTopics(Collection<String>
topicNames, DescribeTopicsOptions options) {
+ synchronized public DescribeTopicsResult describeTopics(TopicCollection
topics, DescribeTopicsOptions options) {
+ if (topics instanceof TopicIdCollection)
+ return DescribeTopicsResult.ofTopicIds(new
HashMap<>(handleDescribeTopicsUsingIds(((TopicIdCollection) topics).topicIds(),
options)));
+ else if (topics instanceof TopicNameCollection)
+ return DescribeTopicsResult.ofTopicNames(new
HashMap<>(handleDescribeTopicsByNames(((TopicNameCollection)
topics).topicNames(), options)));
+ else
+ throw new IllegalArgumentException("The TopicCollection provided
did not match any supported classes for describeTopics.");
+ }
+
+ private Map<String, KafkaFuture<TopicDescription>>
handleDescribeTopicsByNames(Collection<String> topicNames,
DescribeTopicsOptions options) {
Map<String, KafkaFuture<TopicDescription>> topicDescriptions = new
HashMap<>();
if (timeoutNextRequests > 0) {
@@ -366,20 +389,20 @@ public class MockAdminClient extends AdminClient {
}
--timeoutNextRequests;
- return new DescribeTopicsResult(topicDescriptions);
+ return topicDescriptions;
}
for (String requestedTopic : topicNames) {
for (Map.Entry<String, TopicMetadata> topicDescription :
allTopics.entrySet()) {
String topicName = topicDescription.getKey();
+ Uuid topicId = topicIds.getOrDefault(topicName,
Uuid.ZERO_UUID);
if (topicName.equals(requestedTopic) &&
!topicDescription.getValue().markedForDeletion) {
if
(topicDescription.getValue().fetchesRemainingUntilVisible > 0) {
topicDescription.getValue().fetchesRemainingUntilVisible--;
} else {
TopicMetadata topicMetadata =
topicDescription.getValue();
KafkaFutureImpl<TopicDescription> future = new
KafkaFutureImpl<>();
- future.complete(new TopicDescription(topicName,
topicMetadata.isInternalTopic, topicMetadata.partitions,
- Collections.emptySet()));
+ future.complete(new TopicDescription(topicName,
topicMetadata.isInternalTopic, topicMetadata.partitions,
Collections.emptySet(), topicId));
topicDescriptions.put(topicName, future);
break;
}
@@ -392,7 +415,49 @@ public class MockAdminClient extends AdminClient {
}
}
- return new DescribeTopicsResult(topicDescriptions);
+ return topicDescriptions;
+ }
+
+ synchronized public Map<Uuid, KafkaFuture<TopicDescription>>
handleDescribeTopicsUsingIds(Collection<Uuid> topicIds, DescribeTopicsOptions
options) {
+
+ Map<Uuid, KafkaFuture<TopicDescription>> topicDescriptions = new
HashMap<>();
+
+ if (timeoutNextRequests > 0) {
+ for (Uuid requestedTopicId : topicIds) {
+ KafkaFutureImpl<TopicDescription> future = new
KafkaFutureImpl<>();
+ future.completeExceptionally(new TimeoutException());
+ topicDescriptions.put(requestedTopicId, future);
+ }
+
+ --timeoutNextRequests;
+ return topicDescriptions;
+ }
+
+ for (Uuid requestedTopicId : topicIds) {
+ for (Map.Entry<String, TopicMetadata> topicDescription :
allTopics.entrySet()) {
+ String topicName = topicDescription.getKey();
+ Uuid topicId = this.topicIds.get(topicName);
+
+ if (topicId != null && topicId.equals(requestedTopicId) &&
!topicDescription.getValue().markedForDeletion) {
+ if
(topicDescription.getValue().fetchesRemainingUntilVisible > 0) {
+
topicDescription.getValue().fetchesRemainingUntilVisible--;
+ } else {
+ TopicMetadata topicMetadata =
topicDescription.getValue();
+ KafkaFutureImpl<TopicDescription> future = new
KafkaFutureImpl<>();
+ future.complete(new TopicDescription(topicName,
topicMetadata.isInternalTopic, topicMetadata.partitions,
Collections.emptySet(), topicId));
+ topicDescriptions.put(requestedTopicId, future);
+ break;
+ }
+ }
+ }
+ if (!topicDescriptions.containsKey(requestedTopicId)) {
+ KafkaFutureImpl<TopicDescription> future = new
KafkaFutureImpl<>();
+ future.completeExceptionally(new
UnknownTopicIdException("Topic id" + requestedTopicId + " not found."));
+ topicDescriptions.put(requestedTopicId, future);
+ }
+ }
+
+ return topicDescriptions;
}
@Override
@@ -948,6 +1013,7 @@ public class MockAdminClient extends AdminClient {
}
private final static class TopicMetadata {
+ final Uuid topicId;
final boolean isInternalTopic;
final List<TopicPartitionInfo> partitions;
final List<String> partitionLogDirs;
@@ -956,10 +1022,12 @@ public class MockAdminClient extends AdminClient {
public boolean markedForDeletion;
- TopicMetadata(boolean isInternalTopic,
+ TopicMetadata(Uuid topicId,
+ boolean isInternalTopic,
List<TopicPartitionInfo> partitions,
List<String> partitionLogDirs,
Map<String, String> configs) {
+ this.topicId = topicId;
this.isInternalTopic = isInternalTopic;
this.partitions = partitions;
this.partitionLogDirs = partitionLogDirs;
diff --git
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
index a45cf52..0a0466f 100644
---
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
@@ -116,7 +116,7 @@ public class ClientAuthenticationFailureTest {
Map<String, Object> props = new HashMap<>(saslClientConfigs);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" +
server.port());
try (Admin client = Admin.create(props)) {
- KafkaFuture<Map<String, TopicDescription>> future =
client.describeTopics(Collections.singleton("test")).all();
+ KafkaFuture<Map<String, TopicDescription>> future =
client.describeTopics(Collections.singleton("test")).allTopicNames();
TestUtils.assertFutureThrows(future,
SaslAuthenticationException.class);
}
}
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
index fd6bf9f..0e3ccc9 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
@@ -399,7 +399,7 @@ public class MirrorSourceConnector extends SourceConnector {
private static Collection<TopicDescription> describeTopics(AdminClient
adminClient, Collection<String> topics)
throws InterruptedException, ExecutionException {
- return adminClient.describeTopics(topics).all().get().values();
+ return
adminClient.describeTopics(topics).allTopicNames().get().values();
}
static Map<String, String> configToMap(Config config) {
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
index 9661c69..7b2f152 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
@@ -452,7 +452,7 @@ public class TopicAdmin implements AutoCloseable {
String topicNameList = String.join(", ", topics);
Map<String, KafkaFuture<TopicDescription>> newResults =
- admin.describeTopics(Arrays.asList(topics), new
DescribeTopicsOptions()).values();
+ admin.describeTopics(Arrays.asList(topics), new
DescribeTopicsOptions()).topicNameValues();
// Iterate over each future so that we can handle individual failures
like when some topics don't exist
Map<String, TopicDescription> existingTopics = new HashMap<>();
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
index edd9891..dc25129 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
@@ -797,7 +797,7 @@ public class TopicAdminTest {
protected TopicDescription topicDescription(MockAdminClient admin, String
topicName)
throws ExecutionException, InterruptedException {
DescribeTopicsResult result =
admin.describeTopics(Collections.singleton(topicName));
- Map<String, KafkaFuture<TopicDescription>> byName = result.values();
+ Map<String, KafkaFuture<TopicDescription>> byName =
result.topicNameValues();
return byName.get(topicName).get();
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
index db6f67e..17fd1ac 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
@@ -305,7 +305,7 @@ public class EmbeddedKafkaCluster {
log.info("Describing topics {}", topicNames);
try (Admin admin = createAdminClient()) {
DescribeTopicsResult result = admin.describeTopics(topicNames);
- Map<String, KafkaFuture<TopicDescription>> byName =
result.values();
+ Map<String, KafkaFuture<TopicDescription>> byName =
result.topicNameValues();
for (Map.Entry<String, KafkaFuture<TopicDescription>> entry :
byName.entrySet()) {
String topicName = entry.getKey();
try {
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 74b7224..47c1d17 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -465,7 +465,7 @@ object ConsumerGroupCommand extends Logging {
topicWithoutPartitions.asJava,
withTimeoutMs(new DescribeTopicsOptions))
- val unknownPartitions = describeTopicsResult.values().asScala.flatMap {
case (topic, future) =>
+ val unknownPartitions =
describeTopicsResult.topicNameValues().asScala.flatMap { case (topic, future) =>
Try(future.get()) match {
case Success(description) => description.partitions().asScala.map {
partition =>
new TopicPartition(topic, partition.partition())
@@ -726,7 +726,7 @@ object ConsumerGroupCommand extends Logging {
val descriptionMap = adminClient.describeTopics(
topics.asJava,
withTimeoutMs(new DescribeTopicsOptions)
- ).all().get.asScala
+ ).allTopicNames().get.asScala
descriptionMap.flatMap { case (topic, description) =>
description.partitions().asScala.map { tpInfo =>
new TopicPartition(topic, tpInfo.partition)
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 186a431..ac6304b 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -376,7 +376,7 @@ object ReassignPartitionsCommand extends Logging {
topicNamesToLookUp.add(part.topic)
}
val topicDescriptions = adminClient.
- describeTopics(topicNamesToLookUp.asJava).values().asScala
+ describeTopics(topicNamesToLookUp.asJava).topicNameValues().asScala
val notFoundResults = notFoundReassignments.map {
case (part, targetReplicas) =>
currentReassignments.get(part) match {
@@ -618,7 +618,7 @@ object ReassignPartitionsCommand extends Logging {
private def describeTopics(adminClient: Admin,
topics: Set[String])
: Map[String, TopicDescription] = {
- adminClient.describeTopics(topics.asJava).values.asScala.map { case
(topicName, topicDescriptionFuture) =>
+ adminClient.describeTopics(topics.asJava).topicNameValues().asScala.map {
case (topicName, topicDescriptionFuture) =>
try topicName -> topicDescriptionFuture.get
catch {
case t: ExecutionException if
t.getCause.isInstanceOf[UnknownTopicOrPartitionException] =>
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala
b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 8ba2e68..f7b1c87 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -28,7 +28,7 @@ import org.apache.kafka.clients.admin.CreatePartitionsOptions
import org.apache.kafka.clients.admin.CreateTopicsOptions
import org.apache.kafka.clients.admin.DeleteTopicsOptions
import org.apache.kafka.clients.admin.{Admin, ListTopicsOptions,
NewPartitions, NewTopic, PartitionReassignment, Config => JConfig}
-import org.apache.kafka.common.{TopicPartition, TopicPartitionInfo, Uuid}
+import org.apache.kafka.common.{TopicCollection, TopicPartition,
TopicPartitionInfo, Uuid}
import org.apache.kafka.common.config.ConfigResource.Type
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.errors.{ClusterAuthorizationException,
TopicExistsException, UnsupportedVersionException}
@@ -264,7 +264,7 @@ object TopicCommand extends Logging {
ensureTopicExists(topics, opts.topic, !opts.ifExists)
if (topics.nonEmpty) {
- val topicsInfo =
adminClient.describeTopics(topics.asJavaCollection).values()
+ val topicsInfo =
adminClient.describeTopics(topics.asJavaCollection).topicNameValues()
val newPartitions = topics.map { topicName =>
if (topic.hasReplicaAssignment) {
val startPartitionId =
topicsInfo.get(topicName).get().partitions().size()
@@ -297,42 +297,60 @@ object TopicCommand extends Logging {
}
def describeTopic(opts: TopicCommandOptions): Unit = {
- val topics = getTopics(opts.topic, opts.excludeInternalTopics)
- ensureTopicExists(topics, opts.topic, !opts.ifExists)
+ // If topicId is provided and not zero, will use topicId regardless of
topic name
+ val inputTopicId = opts.topicId.map(Uuid.fromString).filter(uuid => uuid
!= Uuid.ZERO_UUID)
+ val useTopicId = inputTopicId.nonEmpty
- if (topics.nonEmpty) {
- val allConfigs = adminClient.describeConfigs(topics.map(new
ConfigResource(Type.TOPIC, _)).asJavaCollection).values()
- val liveBrokers =
adminClient.describeCluster().nodes().get().asScala.map(_.id())
- val topicDescriptions =
adminClient.describeTopics(topics.asJavaCollection).all().get().values().asScala
- val describeOptions = new DescribeOptions(opts, liveBrokers.toSet)
- val topicPartitions = topicDescriptions
- .flatMap(td => td.partitions.iterator().asScala.map(p => new
TopicPartition(td.name(), p.partition())))
- .toSet.asJava
- val reassignments = listAllReassignments(topicPartitions)
-
- for (td <- topicDescriptions) {
- val topicName = td.name
- val topicId = td.topicId()
- val config = allConfigs.get(new ConfigResource(Type.TOPIC,
topicName)).get()
- val sortedPartitions = td.partitions.asScala.sortBy(_.partition)
-
- if (describeOptions.describeConfigs) {
- val hasNonDefault = config.entries().asScala.exists(!_.isDefault)
- if (!opts.reportOverriddenConfigs || hasNonDefault) {
- val numPartitions = td.partitions().size
- val firstPartition = td.partitions.iterator.next()
- val reassignment = reassignments.get(new TopicPartition(td.name,
firstPartition.partition))
- val topicDesc = TopicDescription(topicName, topicId,
numPartitions, getReplicationFactor(firstPartition, reassignment), config,
markedForDeletion = false)
- topicDesc.printDescription()
- }
+ val (topicIds, topics) = if (useTopicId)
+ (getTopicIds(inputTopicId, opts.excludeInternalTopics), Seq())
+ else
+ (Seq(), getTopics(opts.topic, opts.excludeInternalTopics))
+
+ // Only check topic name when topicId is not provided
+ if (useTopicId)
+ ensureTopicIdExists(topicIds, inputTopicId, !opts.ifExists)
+ else
+ ensureTopicExists(topics, opts.topic, !opts.ifExists)
+
+ val topicDescriptions = if (topicIds.nonEmpty) {
+
adminClient.describeTopics(TopicCollection.ofTopicIds(topicIds.toSeq.asJavaCollection)).allTopicIds().get().values().asScala
+ } else if (topics.nonEmpty) {
+
adminClient.describeTopics(TopicCollection.ofTopicNames(topics.asJavaCollection)).allTopicNames().get().values().asScala
+ } else {
+ Seq()
+ }
+
+ val topicNames = topicDescriptions.map(_.name())
+ val allConfigs = adminClient.describeConfigs(topicNames.map(new
ConfigResource(Type.TOPIC, _)).asJavaCollection).values()
+ val liveBrokers =
adminClient.describeCluster().nodes().get().asScala.map(_.id())
+ val describeOptions = new DescribeOptions(opts, liveBrokers.toSet)
+ val topicPartitions = topicDescriptions
+ .flatMap(td => td.partitions.iterator().asScala.map(p => new
TopicPartition(td.name(), p.partition())))
+ .toSet.asJava
+ val reassignments = listAllReassignments(topicPartitions)
+
+ for (td <- topicDescriptions) {
+ val topicName = td.name
+ val topicId = td.topicId()
+ val config = allConfigs.get(new ConfigResource(Type.TOPIC,
topicName)).get()
+ val sortedPartitions = td.partitions.asScala.sortBy(_.partition)
+
+ if (describeOptions.describeConfigs) {
+ val hasNonDefault = config.entries().asScala.exists(!_.isDefault)
+ if (!opts.reportOverriddenConfigs || hasNonDefault) {
+ val numPartitions = td.partitions().size
+ val firstPartition = td.partitions.iterator.next()
+ val reassignment = reassignments.get(new TopicPartition(td.name,
firstPartition.partition))
+ val topicDesc = TopicDescription(topicName, topicId,
numPartitions, getReplicationFactor(firstPartition, reassignment), config,
markedForDeletion = false)
+ topicDesc.printDescription()
}
+ }
- if (describeOptions.describePartitions) {
- for (partition <- sortedPartitions) {
- val reassignment = reassignments.get(new TopicPartition(td.name,
partition.partition))
- val partitionDesc = PartitionDescription(topicName, partition,
Some(config), markedForDeletion = false, reassignment)
- describeOptions.maybePrintPartitionDescription(partitionDesc)
- }
+ if (describeOptions.describePartitions) {
+ for (partition <- sortedPartitions) {
+ val reassignment = reassignments.get(new TopicPartition(td.name,
partition.partition))
+ val partitionDesc = PartitionDescription(topicName, partition,
Some(config), markedForDeletion = false, reassignment)
+ describeOptions.maybePrintPartitionDescription(partitionDesc)
}
}
}
@@ -345,13 +363,23 @@ object TopicCommand extends Logging {
.all().get()
}
- def getTopics(topicIncludelist: Option[String], excludeInternalTopics:
Boolean = false): Seq[String] = {
+ def getTopics(topicIncludeList: Option[String], excludeInternalTopics:
Boolean = false): Seq[String] = {
+ val allTopics = if (excludeInternalTopics) {
+ adminClient.listTopics()
+ } else {
+ adminClient.listTopics(new ListTopicsOptions().listInternal(true))
+ }
+ doGetTopics(allTopics.names().get().asScala.toSeq.sorted,
topicIncludeList, excludeInternalTopics)
+ }
+
+ def getTopicIds(topicIdIncludeList: Option[Uuid], excludeInternalTopics:
Boolean = false): Seq[Uuid] = {
val allTopics = if (excludeInternalTopics) {
adminClient.listTopics()
} else {
adminClient.listTopics(new ListTopicsOptions().listInternal(true))
}
- doGetTopics(allTopics.names().get().asScala.toSeq.sorted,
topicIncludelist, excludeInternalTopics)
+ val allTopicIds =
allTopics.listings().get().asScala.map(_.topicId()).toSeq.sorted
+ topicIdIncludeList.filter(allTopicIds.contains).toSeq
}
def close(): Unit = adminClient.close()
@@ -374,6 +402,23 @@ object TopicCommand extends Logging {
}
}
+ /**
+ * ensures topic existence and throws exception if topic doesn't exist
+ *
+ * @param foundTopicIds Topics that were found to match the requested topic
id.
+ * @param requestedTopicId Id of the topic that was requested.
+ * @param requireTopicIdExists Indicates if the topic needs to exist for the
operation to be successful.
+ * If set to true, the command will throw an
exception if the topic with the
+ * requested id does not exist.
+ */
+ private def ensureTopicIdExists(foundTopicIds: Seq[Uuid], requestedTopicId:
Option[Uuid], requireTopicIdExists: Boolean): Unit = {
+ // If no topic id was mentioned, do not need to throw exception.
+ if (requestedTopicId.isDefined && requireTopicIdExists &&
foundTopicIds.isEmpty) {
+ // If given topicId doesn't exist then throw exception
+ throw new IllegalArgumentException(s"TopicId '${requestedTopicId.get}'
does not exist as expected")
+ }
+ }
+
private def doGetTopics(allTopics: Seq[String], topicIncludeList:
Option[String], excludeInternalTopics: Boolean): Seq[String] = {
if (topicIncludeList.isDefined) {
val topicsFilter = IncludeList(topicIncludeList.get)
@@ -464,6 +509,11 @@ object TopicCommand extends Logging {
.withRequiredArg
.describedAs("topic")
.ofType(classOf[String])
+ private val topicIdOpt = parser.accepts("topic-id", "The topic-id to
describe." +
+ "This is used only with --bootstrap-server option for describing
topics.")
+ .withRequiredArg
+ .describedAs("topic-id")
+ .ofType(classOf[String])
private val nl = System.getProperty("line.separator")
private val kafkaConfigsCanAlterTopicConfigsViaBootstrapServer =
" (the kafka-configs CLI supports altering topic configs with a
--bootstrap-server option)"
@@ -533,6 +583,7 @@ object TopicCommand extends Logging {
def bootstrapServer: Option[String] = valueAsOption(bootstrapServerOpt)
def commandConfig: Properties = if (has(commandConfigOpt))
Utils.loadProps(options.valueOf(commandConfigOpt)) else new Properties()
def topic: Option[String] = valueAsOption(topicOpt)
+ def topicId: Option[String] = valueAsOption(topicIdOpt)
def partitions: Option[Integer] = valueAsOption(partitionsOpt)
def replicationFactor: Option[Integer] =
valueAsOption(replicationFactorOpt)
def replicaAssignment: Option[Map[Int, List[Int]]] =
@@ -566,8 +617,12 @@ object TopicCommand extends Logging {
// check required args
if (!has(bootstrapServerOpt))
throw new IllegalArgumentException("--bootstrap-server must be
specified")
- if (has(describeOpt) && has(ifExistsOpt))
- CommandLineUtils.checkRequiredArgs(parser, options, topicOpt)
+ if (has(describeOpt) && has(ifExistsOpt)) {
+ if (!has(topicOpt) && !has(topicIdOpt))
+ CommandLineUtils.printUsageAndDie(parser, "--topic or --topic-id is
required to describe a topic")
+ if (has(topicOpt) && has(topicIdOpt))
+ println("Only topic id will be used when both --topic and --topic-id
are specified and topicId is not Uuid.ZERO_UUID")
+ }
if (!has(listOpt) && !has(describeOpt))
CommandLineUtils.checkRequiredArgs(parser, options, topicOpt)
if (has(createOpt) && !has(replicaAssignmentOpt))
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 144414d..c2e0c15 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1133,11 +1133,13 @@ class KafkaApis(val requestChannel: RequestChannel,
private def metadataResponseTopic(error: Errors,
topic: String,
+ topicId: Uuid,
isInternal: Boolean,
partitionData:
util.List[MetadataResponsePartition]): MetadataResponseTopic = {
new MetadataResponseTopic()
.setErrorCode(error.code)
.setName(topic)
+ .setTopicId(topicId)
.setIsInternal(isInternal)
.setPartitions(partitionData)
}
@@ -1174,6 +1176,7 @@ class KafkaApis(val requestChannel: RequestChannel,
metadataResponseTopic(
error,
topic,
+ metadataCache.getTopicId(topic),
Topic.isInternal(topic),
util.Collections.emptyList()
)
@@ -1191,16 +1194,29 @@ class KafkaApis(val requestChannel: RequestChannel,
// Topic IDs are not supported for versions 10 and 11. Topic names can not
be null in these versions.
if (!metadataRequest.isAllTopics) {
metadataRequest.data.topics.forEach{ topic =>
- if (topic.name == null) {
+ if (topic.name == null && metadataRequest.version < 12) {
throw new InvalidRequestException(s"Topic name can not be null for
version ${metadataRequest.version}")
- } else if (topic.topicId != Uuid.ZERO_UUID) {
+ } else if (topic.topicId != Uuid.ZERO_UUID && metadataRequest.version
< 12) {
throw new InvalidRequestException(s"Topic IDs are not supported in
requests for version ${metadataRequest.version}")
}
}
}
+ // Check if topicId is presented firstly.
+ val topicIds = metadataRequest.topicIds.asScala.toSet.filterNot(_ ==
Uuid.ZERO_UUID)
+ val useTopicId = topicIds.nonEmpty
+
+ // Only get topicIds and topicNames when supporting topicId
+ val unknownTopicIds =
topicIds.filter(metadataCache.getTopicName(_).isEmpty)
+ val knownTopicNames = topicIds.flatMap(metadataCache.getTopicName)
+
+ val unknownTopicIdsTopicMetadata = unknownTopicIds.map(topicId =>
+ metadataResponseTopic(Errors.UNKNOWN_TOPIC_ID, null, topicId, false,
util.Collections.emptyList())).toSeq
+
val topics = if (metadataRequest.isAllTopics)
metadataCache.getAllTopics()
+ else if (useTopicId)
+ knownTopicNames
else
metadataRequest.topics.asScala.toSet
@@ -1222,16 +1238,23 @@ class KafkaApis(val requestChannel: RequestChannel,
}
val unauthorizedForCreateTopicMetadata =
unauthorizedForCreateTopics.map(topic =>
- metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, topic,
isInternal(topic), util.Collections.emptyList()))
+ // Set topicId to zero since we will never create topic which topicId
+ metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, topic,
Uuid.ZERO_UUID, isInternal(topic), util.Collections.emptyList()))
// do not disclose the existence of topics unauthorized for Describe, so
we've not even checked if they exist or not
val unauthorizedForDescribeTopicMetadata =
// In case of all topics, don't include topics unauthorized for Describe
if ((requestVersion == 0 && (metadataRequest.topics == null ||
metadataRequest.topics.isEmpty)) || metadataRequest.isAllTopics)
Set.empty[MetadataResponseTopic]
- else
+ else if (useTopicId) {
+ // Topic IDs are not considered sensitive information, so returning
TOPIC_AUTHORIZATION_FAILED is OK
unauthorizedForDescribeTopics.map(topic =>
- metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, topic,
false, util.Collections.emptyList()))
+ metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, null,
metadataCache.getTopicId(topic), false, util.Collections.emptyList()))
+ } else {
+ // We should not return topicId when on unauthorized error, so we
return zero uuid.
+ unauthorizedForDescribeTopics.map(topic =>
+ metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, topic,
Uuid.ZERO_UUID, false, util.Collections.emptyList()))
+ }
// In version 0, we returned an error when brokers with replicas were
unavailable,
// while in higher versions we simply don't include the broker in the
returned broker list
@@ -1267,7 +1290,8 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
- val completeTopicMetadata = topicMetadata ++
unauthorizedForCreateTopicMetadata ++ unauthorizedForDescribeTopicMetadata
+ val completeTopicMetadata = unknownTopicIdsTopicMetadata ++
+ topicMetadata ++ unauthorizedForCreateTopicMetadata ++
unauthorizedForDescribeTopicMetadata
val brokers =
metadataCache.getAliveBrokerNodes(request.context.listenerName)
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala
b/core/src/main/scala/kafka/server/MetadataCache.scala
index a6d5623..67285d7 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -54,6 +54,10 @@ trait MetadataCache {
def getAliveBrokers(): Iterable[BrokerMetadata]
+ def getTopicId(topicName: String): Uuid
+
+ def getTopicName(topicId: Uuid): Option[String]
+
def getAliveBrokerNode(brokerId: Int, listenerName: ListenerName):
Option[Node]
def getAliveBrokerNodes(listenerName: ListenerName): Iterable[Node]
diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
index 6ae1922..b7fbd17 100644
--- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
@@ -193,6 +193,10 @@ class KRaftMetadataCache(val brokerId: Int) extends
MetadataCache with Logging w
}
}
+ override def getTopicId(topicName: String): Uuid =
_currentImage.topics().topicsByName().asScala.get(topicName).map(_.id()).getOrElse(Uuid.ZERO_UUID)
+
+ override def getTopicName(topicId: Uuid): Option[String] =
_currentImage.topics().topicsById.asScala.get(topicId).map(_.name())
+
override def hasAliveBroker(brokerId: Int): Boolean = {
Option(_currentImage.cluster().broker(brokerId)).count(!_.fenced()) == 1
}
diff --git a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
index 71bfdf3..83b5eed 100755
--- a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
@@ -234,6 +234,14 @@ class ZkMetadataCache(brokerId: Int) extends MetadataCache
with Logging {
metadataSnapshot.aliveBrokers.values.flatMap(_.getNode(listenerName))
}
+ def getTopicId(topicName: String): Uuid = {
+ metadataSnapshot.topicIds.getOrElse(topicName, Uuid.ZERO_UUID)
+ }
+
+ def getTopicName(topicId: Uuid): Option[String] = {
+ metadataSnapshot.topicNames.get(topicId)
+ }
+
private def addOrUpdatePartitionInfo(partitionStates:
mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]],
topic: String,
partitionId: Int,
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index 82bc477..b2bece4 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -221,7 +221,7 @@ object ReplicaVerificationTool extends Logging {
private def listTopicsMetadata(adminClient: Admin): Seq[TopicDescription] = {
val topics = adminClient.listTopics(new
ListTopicsOptions().listInternal(true)).names.get
- adminClient.describeTopics(topics).all.get.values.asScala.toBuffer
+
adminClient.describeTopics(topics).allTopicNames.get.values.asScala.toBuffer
}
private def brokerDetails(adminClient: Admin): Map[Int, Node] = {
diff --git
a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
index b9aa208..7525cd2 100644
--- a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
@@ -110,7 +110,7 @@ abstract class BaseAdminIntegrationTest extends
IntegrationTestHarness with Logg
assertFutureExceptionTypeEquals(failedCreateResult.replicationFactor("mytopic3"),
classOf[TopicExistsException])
assertFutureExceptionTypeEquals(failedCreateResult.config("mytopic3"),
classOf[TopicExistsException])
- val topicToDescription = client.describeTopics(topics.asJava).all.get()
+ val topicToDescription =
client.describeTopics(topics.asJava).allTopicNames.get()
assertEquals(topics.toSet, topicToDescription.keySet.asScala)
val topic0 = topicToDescription.get("mytopic")
@@ -225,7 +225,7 @@ abstract class BaseAdminIntegrationTest extends
IntegrationTestHarness with Logg
expectedNumPartitionsOpt: Option[Int] = None):
TopicDescription = {
var result: TopicDescription = null
waitUntilTrue(() => {
- val topicResult = client.describeTopics(Set(topic).asJava,
describeOptions).values.get(topic)
+ val topicResult = client.describeTopics(Set(topic).asJava,
describeOptions).topicNameValues().get(topic)
try {
result = topicResult.get
expectedNumPartitionsOpt.map(_ ==
result.partitions.size).getOrElse(true)
diff --git
a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
index 6cc7c48..22fee47 100644
---
a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
+++
b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
@@ -183,13 +183,13 @@ class DescribeAuthorizedOperationsTest extends
IntegrationTestHarness with SaslS
createTopic(Topic2)
// test without includeAuthorizedOperations flag
- var describeTopicsResult = client.describeTopics(Set(Topic1,
Topic2).asJava).all.get()
+ var describeTopicsResult = client.describeTopics(Set(Topic1,
Topic2).asJava).allTopicNames.get()
assertNull(describeTopicsResult.get(Topic1).authorizedOperations)
assertNull(describeTopicsResult.get(Topic2).authorizedOperations)
// test with includeAuthorizedOperations flag
describeTopicsResult = client.describeTopics(Set(Topic1, Topic2).asJava,
- new DescribeTopicsOptions().includeAuthorizedOperations(true)).all.get()
+ new
DescribeTopicsOptions().includeAuthorizedOperations(true)).allTopicNames.get()
assertEquals(Set(AclOperation.DESCRIBE),
describeTopicsResult.get(Topic1).authorizedOperations().asScala.toSet)
assertEquals(Set(AclOperation.DESCRIBE),
describeTopicsResult.get(Topic2).authorizedOperations().asScala.toSet)
@@ -201,7 +201,7 @@ class DescribeAuthorizedOperationsTest extends
IntegrationTestHarness with SaslS
val expectedOperations =
AclEntry.supportedOperations(ResourceType.TOPIC).asJava
describeTopicsResult = client.describeTopics(Set(Topic1, Topic2).asJava,
- new DescribeTopicsOptions().includeAuthorizedOperations(true)).all.get()
+ new
DescribeTopicsOptions().includeAuthorizedOperations(true)).allTopicNames.get()
assertEquals(expectedOperations,
describeTopicsResult.get(Topic1).authorizedOperations())
assertEquals(Set(AclOperation.DESCRIBE, AclOperation.DELETE),
describeTopicsResult.get(Topic2).authorizedOperations().asScala.toSet)
diff --git
a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index f361641..df60010 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -346,7 +346,7 @@ abstract class EndToEndAuthorizationTest extends
IntegrationTestHarness with Sas
consumer.assign(List(tp).asJava)
assertThrows(classOf[TopicAuthorizationException], () =>
consumeRecords(consumer, numRecords, topic = tp.topic))
val adminClient = createAdminClient()
- val e1 = assertThrows(classOf[ExecutionException], () =>
adminClient.describeTopics(Set(topic).asJava).all().get())
+ val e1 = assertThrows(classOf[ExecutionException], () =>
adminClient.describeTopics(Set(topic).asJava).allTopicNames().get())
assertTrue(e1.getCause.isInstanceOf[TopicAuthorizationException],
"Unexpected exception " + e1.getCause)
// Verify successful produce/consume/describe on another topic using the
same producer, consumer and adminClient
@@ -356,9 +356,9 @@ abstract class EndToEndAuthorizationTest extends
IntegrationTestHarness with Sas
sendRecords(producer, numRecords, tp2)
consumer.assign(List(tp2).asJava)
consumeRecords(consumer, numRecords, topic = topic2)
- val describeResults = adminClient.describeTopics(Set(topic,
topic2).asJava).values
+ val describeResults = adminClient.describeTopics(Set(topic,
topic2).asJava).topicNameValues()
assertEquals(1, describeResults.get(topic2).get().partitions().size())
- val e2 = assertThrows(classOf[ExecutionException], () =>
adminClient.describeTopics(Set(topic).asJava).all().get())
+ val e2 = assertThrows(classOf[ExecutionException], () =>
adminClient.describeTopics(Set(topic).asJava).allTopicNames().get())
assertTrue(e2.getCause.isInstanceOf[TopicAuthorizationException],
"Unexpected exception " + e2.getCause)
// Verify that consumer manually assigning both authorized and
unauthorized topic doesn't consume
@@ -382,7 +382,7 @@ abstract class EndToEndAuthorizationTest extends
IntegrationTestHarness with Sas
}
sendRecords(producer, numRecords, tp)
consumeRecordsIgnoreOneAuthorizationException(consumer, numRecords,
startingOffset = 0, topic)
- val describeResults2 = adminClient.describeTopics(Set(topic,
topic2).asJava).values
+ val describeResults2 = adminClient.describeTopics(Set(topic,
topic2).asJava).topicNameValues
assertEquals(1, describeResults2.get(topic).get().partitions().size())
assertEquals(1, describeResults2.get(topic2).get().partitions().size())
}
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 6885c79..36586ed 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -40,17 +40,17 @@ import org.apache.kafka.common.errors._
import org.apache.kafka.common.requests.{DeleteRecordsRequest,
MetadataResponse}
import org.apache.kafka.common.resource.{PatternType, ResourcePattern,
ResourceType}
import org.apache.kafka.common.utils.{Time, Utils}
-import org.apache.kafka.common.{ConsumerGroupState, ElectionType,
TopicCollection, TopicPartition, TopicPartitionInfo, TopicPartitionReplica}
+import org.apache.kafka.common.{ConsumerGroupState, ElectionType,
TopicCollection, TopicPartition, TopicPartitionInfo, TopicPartitionReplica,
Uuid}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test}
import org.slf4j.LoggerFactory
import scala.annotation.nowarn
-import scala.jdk.CollectionConverters._
import scala.collection.Seq
import scala.compat.java8.OptionConverters._
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
+import scala.jdk.CollectionConverters._
import scala.util.Random
/**
@@ -145,7 +145,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
val controller = servers.find(_.config.brokerId ==
TestUtils.waitUntilControllerElected(zkClient)).get
controller.shutdown()
controller.awaitShutdown()
- val topicDesc = client.describeTopics(topics.asJava).all.get()
+ val topicDesc = client.describeTopics(topics.asJava).allTopicNames.get()
assertEquals(topics.toSet, topicDesc.keySet.asScala)
}
@@ -161,13 +161,29 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
waitForTopics(client, Seq(existingTopic), List())
val nonExistingTopic = "non-existing"
- val results = client.describeTopics(Seq(nonExistingTopic,
existingTopic).asJava).values
+ val results = client.describeTopics(Seq(nonExistingTopic,
existingTopic).asJava).topicNameValues()
assertEquals(existingTopic, results.get(existingTopic).get.name)
assertThrows(classOf[ExecutionException], () =>
results.get(nonExistingTopic).get).getCause.isInstanceOf[UnknownTopicOrPartitionException]
assertEquals(None, zkClient.getTopicPartitionCount(nonExistingTopic))
}
@Test
+ def testDescribeTopicsWithIds(): Unit = {
+ client = Admin.create(createConfig)
+
+ val existingTopic = "existing-topic"
+ client.createTopics(Seq(existingTopic).map(new NewTopic(_, 1,
1.toShort)).asJava).all.get()
+ waitForTopics(client, Seq(existingTopic), List())
+ val existingTopicId =
zkClient.getTopicIdsForTopics(Set(existingTopic)).values.head
+
+ val nonExistingTopicId = Uuid.randomUuid()
+
+ val results =
client.describeTopics(TopicCollection.ofTopicIds(Seq(existingTopicId,
nonExistingTopicId).asJava)).topicIdValues()
+ assertEquals(existingTopicId, results.get(existingTopicId).get.topicId())
+ assertThrows(classOf[ExecutionException], () =>
results.get(nonExistingTopicId).get).getCause.isInstanceOf[UnknownTopicIdException]
+ }
+
+ @Test
def testDescribeCluster(): Unit = {
client = Admin.create(createConfig)
val result = client.describeCluster
diff --git
a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
index 2d4abf4..2a7d5c9 100644
---
a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
+++
b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
@@ -136,7 +136,7 @@ class SaslClientsWithInvalidCredentialsTest extends
IntegrationTestHarness with
def describeTopic(): Unit = {
try {
- val response =
adminClient.describeTopics(Collections.singleton(topic)).all.get
+ val response =
adminClient.describeTopics(Collections.singleton(topic)).allTopicNames.get
assertEquals(1, response.size)
response.forEach { (topic, description) =>
assertEquals(numPartitions, description.partitions.size)
diff --git
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 08a55ae..84e8099 100644
---
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -675,7 +675,7 @@ class DynamicBrokerReconfigurationTest extends
ZooKeeperTestHarness with SaslSet
consumer.commitSync()
def partitionInfo: TopicPartitionInfo =
-
adminClients.head.describeTopics(Collections.singleton(topic)).values.get(topic).get().partitions().get(0)
+
adminClients.head.describeTopics(Collections.singleton(topic)).topicNameValues().get(topic).get().partitions().get(0)
val partitionInfo0 = partitionInfo
assertEquals(partitionInfo0.replicas.get(0), partitionInfo0.leader)
diff --git
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index b7ca0f7..25e79ad 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -393,7 +393,7 @@ class KRaftClusterTest {
var currentMapping: Seq[Seq[Int]] = Seq()
val expectedMapping = Seq(Seq(2, 1, 0), Seq(0, 1, 2), Seq(2, 3),
Seq(3, 2, 0, 1))
TestUtils.waitUntilTrue( () => {
- val topicInfoMap =
admin.describeTopics(Collections.singleton("foo")).all().get()
+ val topicInfoMap =
admin.describeTopics(Collections.singleton("foo")).allTopicNames().get()
if (topicInfoMap.containsKey("foo")) {
currentMapping =
translatePartitionInfoToSeq(topicInfoMap.get("foo").partitions())
expectedMapping.equals(currentMapping)
diff --git
a/core/src/test/scala/integration/kafka/server/MetadataRequestBetweenDifferentIbpTest.scala
b/core/src/test/scala/integration/kafka/server/MetadataRequestBetweenDifferentIbpTest.scala
new file mode 100644
index 0000000..387c244
--- /dev/null
+++
b/core/src/test/scala/integration/kafka/server/MetadataRequestBetweenDifferentIbpTest.scala
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.api.{ApiVersion, KAFKA_2_8_IV0}
+import kafka.network.SocketServer
+import kafka.utils.TestUtils
+import kafka.zk.ZkVersion
+import org.apache.kafka.common.Uuid
+import org.apache.kafka.common.message.MetadataRequestData
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
+
+import scala.collection.{Map, Seq}
+
+class MetadataRequestBetweenDifferentIbpTest extends BaseRequestTest {
+
+ override def brokerCount: Int = 3
+ override def generateConfigs: Seq[KafkaConfig] = {
+ Seq(
+ createConfig(0, KAFKA_2_8_IV0),
+ createConfig(1, ApiVersion.latestVersion),
+ createConfig(2, ApiVersion.latestVersion)
+ )
+ }
+
+ @Test
+ def testUnknownTopicId(): Unit = {
+ val topic = "topic"
+
+ // Kill controller and restart until broker with latest ibp become
controller
+ ensureControllerIn(Seq(1, 2))
+ createTopic(topic, Map(0 -> Seq(1, 2, 0), 1 -> Seq(2, 0, 1)))
+
+ val resp1 = sendMetadataRequest(new MetadataRequest(requestData(topic,
Uuid.ZERO_UUID), 12.toShort), controllerSocketServer)
+ val topicId = resp1.topicMetadata.iterator().next().topicId()
+
+ // We could still get topic metadata by topicId
+ val topicMetadata = sendMetadataRequest(new
MetadataRequest(requestData(null, topicId), 12.toShort), controllerSocketServer)
+ .topicMetadata.iterator().next()
+ assertEquals(topicId, topicMetadata.topicId())
+ assertEquals(topic, topicMetadata.topic())
+
+ // Make the broker whose version=KAFKA_2_8_IV0 controller
+ ensureControllerIn(Seq(0))
+
+ // Restart the broker whose ibp is higher, and the controller will send
metadata request to it
+ killBroker(1)
+ restartDeadBrokers()
+
+ // Send request to a broker whose ibp is higher and restarted just now
+ val resp2 = sendMetadataRequest(new MetadataRequest(requestData(topic,
topicId), 12.toShort), brokerSocketServer(1))
+ assertEquals(Errors.UNKNOWN_TOPIC_ID,
resp2.topicMetadata.iterator().next().error())
+ }
+
+ private def ensureControllerIn(brokerIds: Seq[Int]): Unit = {
+ while (!brokerIds.contains(controllerSocketServer.config.brokerId)) {
+ zkClient.deleteController(ZkVersion.MatchAnyVersion)
+ TestUtils.waitUntilControllerElected(zkClient)
+ }
+ }
+
+ private def createConfig(nodeId: Int,interBrokerVersion: ApiVersion):
KafkaConfig = {
+ val props = TestUtils.createBrokerConfig(nodeId, zkConnect)
+ props.put(KafkaConfig.InterBrokerProtocolVersionProp,
interBrokerVersion.version)
+ KafkaConfig.fromProps(props)
+ }
+
+ def requestData(topic: String, topicId: Uuid): MetadataRequestData = {
+ val data = new MetadataRequestData
+ data.topics.add(new
MetadataRequestData.MetadataRequestTopic().setName(topic).setTopicId(topicId))
+ data
+ }
+
+ private def sendMetadataRequest(request: MetadataRequest, destination:
SocketServer): MetadataResponse = {
+ connectAndReceive[MetadataResponse](request, destination)
+ }
+
+}
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 4ce7658..4a53c67 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -147,7 +147,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
private def waitUntilTopicGone(adminClient: Admin, topicName: String): Unit
= {
TestUtils.waitUntilTrue(() => {
try {
-
adminClient.describeTopics(util.Collections.singletonList(topicName)).all().get()
+
adminClient.describeTopics(util.Collections.singletonList(topicName)).allTopicNames().get()
false
} catch {
case e: ExecutionException =>
diff --git
a/core/src/test/scala/unit/kafka/admin/TopicCommandIntegrationTest.scala
b/core/src/test/scala/unit/kafka/admin/TopicCommandIntegrationTest.scala
index 9a9fca8..19dbd6a 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandIntegrationTest.scala
@@ -110,7 +110,7 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
val partitions = adminClient
.describeTopics(Collections.singletonList(testTopicName))
- .all()
+ .allTopicNames()
.get()
.get(testTopicName)
.partitions()
@@ -125,7 +125,7 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
val partitions = adminClient
.describeTopics(Collections.singletonList(testTopicName))
- .all()
+ .allTopicNames()
.get()
.get(testTopicName)
.partitions()
@@ -140,7 +140,7 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
val partitions = adminClient
.describeTopics(Collections.singletonList(testTopicName))
- .all()
+ .allTopicNames()
.get()
.get(testTopicName)
.partitions()
@@ -190,7 +190,7 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
val partitions = adminClient
.describeTopics(Collections.singletonList(testTopicName))
- .all()
+ .allTopicNames()
.get()
.get(testTopicName)
.partitions()
@@ -287,7 +287,7 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
topicService.alterTopic(new TopicCommandOptions(
Array("--topic", testTopicName, "--partitions", "3")))
- val topicDescription =
adminClient.describeTopics(Collections.singletonList(testTopicName)).values().get(testTopicName).get()
+ val topicDescription =
adminClient.describeTopics(Collections.singletonList(testTopicName)).topicNameValues().get(testTopicName).get()
assertTrue(topicDescription.partitions().size() == 3)
}
@@ -300,7 +300,7 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
topicService.alterTopic(new TopicCommandOptions(
Array("--topic", testTopicName, "--replica-assignment", "5:3,3:1,4:2",
"--partitions", "3")))
- val topicDescription =
adminClient.describeTopics(Collections.singletonList(testTopicName)).values().get(testTopicName).get()
+ val topicDescription =
adminClient.describeTopics(Collections.singletonList(testTopicName)).topicNameValues().get(testTopicName).get()
assertTrue(topicDescription.partitions().size() == 3)
assertEquals(List(4,2),
topicDescription.partitions().get(2).replicas().asScala.map(_.id()))
}
@@ -491,7 +491,7 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
try {
// check which partition is on broker 0 which we'll kill
val testTopicDescription =
adminClient.describeTopics(Collections.singletonList(testTopicName))
- .all().get().asScala(testTopicName)
+ .allTopicNames().get().asScala(testTopicName)
val partitionOnBroker0 =
testTopicDescription.partitions().asScala.find(_.leader().id() ==
0).get.partition()
killBroker(0)
@@ -586,7 +586,7 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
val brokerIds = servers.map(_.config.brokerId)
TestUtils.setReplicationThrottleForPartitions(adminClient, brokerIds,
Set(tp), throttleBytes = 1)
- val testTopicDesc =
adminClient.describeTopics(Collections.singleton(testTopicName)).all().get().get(testTopicName)
+ val testTopicDesc =
adminClient.describeTopics(Collections.singleton(testTopicName)).allTopicNames().get().get(testTopicName)
val firstPartition = testTopicDesc.partitions().asScala.head
val replicasOfFirstPartition =
firstPartition.replicas().asScala.map(_.id())
@@ -797,7 +797,7 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
Collections.emptyList(), Collections.emptyList())
val describeResult =
AdminClientTestUtils.describeTopicsResult(testTopicName, new TopicDescription(
testTopicName, false, Collections.singletonList(topicPartitionInfo)))
- when(adminClient.describeTopics(any())).thenReturn(describeResult)
+
when(adminClient.describeTopics(any(classOf[java.util.Collection[String]]))).thenReturn(describeResult)
val result = AdminClientTestUtils.createPartitionsResult(testTopicName,
Errors.THROTTLING_QUOTA_EXCEEDED.exception())
when(adminClient.createPartitions(any(), any())).thenReturn(result)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 9739e92..1ddd63a 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -2232,6 +2232,117 @@ class KafkaApisTest {
assertTrue(response.topicsByError(Errors.UNKNOWN_TOPIC_OR_PARTITION).isEmpty)
}
+ @Test
+ def testUnauthorizedTopicMetadataRequest(): Unit = {
+ // 1. Set up broker information
+ val plaintextListener =
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
+ val broker = new UpdateMetadataBroker()
+ .setId(0)
+ .setRack("rack")
+ .setEndpoints(Seq(
+ new UpdateMetadataEndpoint()
+ .setHost("broker0")
+ .setPort(9092)
+ .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
+ .setListener(plaintextListener.value)
+ ).asJava)
+
+ // 2. Set up authorizer
+ val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+ val unauthorizedTopic = "unauthorized-topic"
+ val authorizedTopic = "authorized-topic"
+
+ val expectedActions = Seq(
+ new Action(AclOperation.DESCRIBE, new
ResourcePattern(ResourceType.TOPIC, unauthorizedTopic, PatternType.LITERAL), 1,
true, true),
+ new Action(AclOperation.DESCRIBE, new
ResourcePattern(ResourceType.TOPIC, authorizedTopic, PatternType.LITERAL), 1,
true, true)
+ )
+
+ // Here we need to use AuthHelperTest.matchSameElements instead of
EasyMock.eq since the order of the request is unknown
+ EasyMock.expect(authorizer.authorize(anyObject[RequestContext],
AuthHelperTest.matchSameElements(expectedActions.asJava)))
+ .andAnswer { () =>
+ val actions =
EasyMock.getCurrentArguments.apply(1).asInstanceOf[util.List[Action]].asScala
+ actions.map { action =>
+ if (action.resourcePattern().name().equals(authorizedTopic))
+ AuthorizationResult.ALLOWED
+ else
+ AuthorizationResult.DENIED
+ }.asJava
+ }.times(2)
+
+ // 3. Set up MetadataCache
+ val authorizedTopicId = Uuid.randomUuid()
+ val unauthorizedTopicId = Uuid.randomUuid();
+
+ val topicIds = new util.HashMap[String, Uuid]()
+ topicIds.put(authorizedTopic, authorizedTopicId)
+ topicIds.put(unauthorizedTopic, unauthorizedTopicId)
+
+ def createDummyPartitionStates(topic: String) = {
+ new UpdateMetadataPartitionState()
+ .setTopicName(topic)
+ .setPartitionIndex(0)
+ .setControllerEpoch(0)
+ .setLeader(0)
+ .setLeaderEpoch(0)
+ .setReplicas(Collections.singletonList(0))
+ .setZkVersion(0)
+ .setIsr(Collections.singletonList(0))
+ }
+
+ // Send UpdateMetadataReq to update MetadataCache
+ val partitionStates = Seq(unauthorizedTopic,
authorizedTopic).map(createDummyPartitionStates)
+
+ val updateMetadataRequest = new
UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0,
+ 0, 0, partitionStates.asJava, Seq(broker).asJava, topicIds).build()
+ metadataCache.asInstanceOf[ZkMetadataCache].updateMetadata(correlationId =
0, updateMetadataRequest)
+
+ // 4. Send TopicMetadataReq using topicId
+ val metadataReqByTopicId = new
MetadataRequest.Builder(util.Arrays.asList(authorizedTopicId,
unauthorizedTopicId)).build()
+ val repByTopicId = buildRequest(metadataReqByTopicId, plaintextListener)
+ val capturedMetadataByTopicIdResp = expectNoThrottling(repByTopicId)
+ EasyMock.replay(clientRequestQuotaManager, requestChannel, authorizer)
+
+ createKafkaApis(authorizer =
Some(authorizer)).handleTopicMetadataRequest(repByTopicId)
+ val metadataByTopicIdResp =
capturedMetadataByTopicIdResp.getValue.asInstanceOf[MetadataResponse]
+
+ val metadataByTopicId =
metadataByTopicIdResp.data().topics().asScala.groupBy(_.topicId()).map(kv =>
(kv._1, kv._2.head))
+
+ metadataByTopicId.foreach{ case (topicId, metadataResponseTopic) =>
+ if (topicId == unauthorizedTopicId) {
+ // Return an TOPIC_AUTHORIZATION_FAILED on unauthorized error
regardless of leaking the existence of topic id
+ assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(),
metadataResponseTopic.errorCode())
+ // Do not return topic information on unauthorized error
+ assertNull(metadataResponseTopic.name())
+ } else {
+ assertEquals(Errors.NONE.code(), metadataResponseTopic.errorCode())
+ assertEquals(authorizedTopic, metadataResponseTopic.name())
+ }
+ }
+
+ // 4. Send TopicMetadataReq using topic name
+ EasyMock.reset(clientRequestQuotaManager, requestChannel)
+ val metadataReqByTopicName = new
MetadataRequest.Builder(util.Arrays.asList(authorizedTopic, unauthorizedTopic),
false).build()
+ val repByTopicName = buildRequest(metadataReqByTopicName,
plaintextListener)
+ val capturedMetadataByTopicNameResp = expectNoThrottling(repByTopicName)
+ EasyMock.replay(clientRequestQuotaManager, requestChannel)
+
+ createKafkaApis(authorizer =
Some(authorizer)).handleTopicMetadataRequest(repByTopicName)
+ val metadataByTopicNameResp =
capturedMetadataByTopicNameResp.getValue.asInstanceOf[MetadataResponse]
+
+ val metadataByTopicName =
metadataByTopicNameResp.data().topics().asScala.groupBy(_.name()).map(kv =>
(kv._1, kv._2.head))
+
+ metadataByTopicName.foreach{ case (topicName, metadataResponseTopic) =>
+ if (topicName == unauthorizedTopic) {
+ assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(),
metadataResponseTopic.errorCode())
+ // Do not return topic Id on unauthorized error
+ assertEquals(Uuid.ZERO_UUID, metadataResponseTopic.topicId())
+ } else {
+ assertEquals(Errors.NONE.code(), metadataResponseTopic.errorCode())
+ assertEquals(authorizedTopicId, metadataResponseTopic.topicId())
+ }
+ }
+ }
+
/**
* Verifies that sending a fetch request with version 9 works correctly when
* ReplicaManager.getLogConfig returns None.
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 5620bf7..64003b7 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1631,7 +1631,7 @@ object TestUtils extends Logging {
waitUntilTrue(() => {
try {
- val topicResult =
client.describeTopics(Arrays.asList(topic)).all.get.get(topic)
+ val topicResult =
client.describeTopics(Arrays.asList(topic)).allTopicNames.get.get(topic)
val partitionResult = topicResult.partitions.get(partition)
Option(partitionResult.leader).map(_.id) == leader
} catch {
@@ -1643,7 +1643,7 @@ object TestUtils extends Logging {
def waitForBrokersOutOfIsr(client: Admin, partition: Set[TopicPartition],
brokerIds: Set[Int]): Unit = {
waitUntilTrue(
() => {
- val description =
client.describeTopics(partition.map(_.topic).asJava).all.get.asScala
+ val description =
client.describeTopics(partition.map(_.topic).asJava).allTopicNames.get.asScala
val isr = description
.values
.flatMap(_.partitions.asScala.flatMap(_.isr.asScala))
@@ -1659,7 +1659,7 @@ object TestUtils extends Logging {
def waitForBrokersInIsr(client: Admin, partition: TopicPartition, brokerIds:
Set[Int]): Unit = {
waitUntilTrue(
() => {
- val description =
client.describeTopics(Set(partition.topic).asJava).all.get.asScala
+ val description =
client.describeTopics(Set(partition.topic).asJava).allTopicNames.get.asScala
val isr = description
.values
.flatMap(_.partitions.asScala.flatMap(_.isr.asScala))
@@ -1675,7 +1675,7 @@ object TestUtils extends Logging {
def waitForReplicasAssigned(client: Admin, partition: TopicPartition,
brokerIds: Seq[Int]): Unit = {
waitUntilTrue(
() => {
- val description =
client.describeTopics(Set(partition.topic).asJava).all.get.asScala
+ val description =
client.describeTopics(Set(partition.topic).asJava).allTopicNames.get.asScala
val replicas = description
.values
.flatMap(_.partitions.asScala.flatMap(_.replicas.asScala))
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
index 8d9e2da..6b47e26 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
@@ -399,7 +399,7 @@ public class TopicBasedRemoteLogMetadataManager implements
RemoteLogMetadataMana
String topicName) throws
InterruptedException, ExecutionException {
log.debug("Getting topic details to check for partition count and
replication factor.");
TopicDescription topicDescription =
adminClient.describeTopics(Collections.singleton(topicName))
-
.values().get(topicName).get();
+
.topicNameValues().get(topicName).get();
int expectedPartitions = rlmmConfig.metadataTopicPartitionsCount();
int topicPartitionsSize = topicDescription.partitions().size();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index 12e6242..6954921 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -157,7 +157,7 @@ public class InternalTopicManager {
Map<String, KafkaFuture<TopicDescription>> descriptionsForTopic =
Collections.emptyMap();
if (!topicDescriptionsStillToValidate.isEmpty()) {
final DescribeTopicsResult describeTopicsResult =
adminClient.describeTopics(topicDescriptionsStillToValidate);
- descriptionsForTopic = describeTopicsResult.values();
+ descriptionsForTopic = describeTopicsResult.topicNameValues();
}
Map<String, KafkaFuture<Config>> configsForTopic =
Collections.emptyMap();
if (!topicConfigsStillToValidate.isEmpty()) {
@@ -515,7 +515,7 @@ public class InternalTopicManager {
log.debug("Trying to check if topics {} have been created with
expected number of partitions.", topics);
final DescribeTopicsResult describeTopicsResult =
adminClient.describeTopics(topics);
- final Map<String, KafkaFuture<TopicDescription>> futures =
describeTopicsResult.values();
+ final Map<String, KafkaFuture<TopicDescription>> futures =
describeTopicsResult.topicNameValues();
final Map<String, Integer> existedTopicPartition = new HashMap<>();
for (final Map.Entry<String, KafkaFuture<TopicDescription>>
topicFuture : futures.entrySet()) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
index c2dee61..1e7f685 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
@@ -675,7 +675,7 @@ public class KStreamRepartitionIntegrationTest {
private int getNumberOfPartitionsForTopic(final String topic) throws
Exception {
try (final AdminClient adminClient = createAdminClient()) {
final TopicDescription topicDescription =
adminClient.describeTopics(Collections.singleton(topic))
- .values()
+
.topicNameValues()
.get(topic)
.get();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java
index c0fc796..512d1c1 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java
@@ -205,7 +205,7 @@ public class
StreamTableJoinTopologyOptimizationIntegrationTest {
private int getNumberOfPartitionsForTopic(final String topic) throws
Exception {
try (final AdminClient adminClient = createAdminClient()) {
final TopicDescription topicDescription =
adminClient.describeTopics(Collections.singleton(topic))
- .values()
+
.topicNameValues()
.get(topic)
.get(IntegrationTestUtils.DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
index abfa5c4..853d8d7 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
@@ -632,17 +632,17 @@ public class InternalTopicManagerTest {
{
add(new TopicPartitionInfo(0, broker1, singleReplica,
Collections.emptyList()));
}
- }),
mockAdminClient.describeTopics(Collections.singleton(topic1)).values().get(topic1).get());
+ }),
mockAdminClient.describeTopics(Collections.singleton(topic1)).topicNameValues().get(topic1).get());
assertEquals(new TopicDescription(topic2, false, new
ArrayList<TopicPartitionInfo>() {
{
add(new TopicPartitionInfo(0, broker1, singleReplica,
Collections.emptyList()));
}
- }),
mockAdminClient.describeTopics(Collections.singleton(topic2)).values().get(topic2).get());
+ }),
mockAdminClient.describeTopics(Collections.singleton(topic2)).topicNameValues().get(topic2).get());
assertEquals(new TopicDescription(topic3, false, new
ArrayList<TopicPartitionInfo>() {
{
add(new TopicPartitionInfo(0, broker1, singleReplica,
Collections.emptyList()));
}
- }),
mockAdminClient.describeTopics(Collections.singleton(topic3)).values().get(topic3).get());
+ }),
mockAdminClient.describeTopics(Collections.singleton(topic3)).topicNameValues().get(topic3).get());
final ConfigResource resource = new
ConfigResource(ConfigResource.Type.TOPIC, topic1);
final ConfigResource resource2 = new
ConfigResource(ConfigResource.Type.TOPIC, topic2);
@@ -1732,7 +1732,7 @@ public class InternalTopicManagerTest {
private static class MockDescribeTopicsResult extends DescribeTopicsResult
{
MockDescribeTopicsResult(final Map<String,
KafkaFuture<TopicDescription>> futures) {
- super(futures);
+ super(null, futures);
}
}
diff --git
a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
index a5d6c7a..e40ca7a 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
@@ -338,7 +338,7 @@ public class ClientCompatibilityTest {
throws InterruptedException, ExecutionException {
while (true) {
try {
- client.describeTopics(topics).all().get();
+ client.describeTopics(topics).allTopicNames().get();
break;
} catch (ExecutionException e) {
if (e.getCause() instanceof UnknownTopicOrPartitionException)
diff --git
a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
index b8f1c9d..10a368a 100644
--- a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
@@ -750,7 +750,7 @@ public abstract class TransactionsCommand {
List<TopicPartition> topicPartitions
) throws Exception {
try {
- Map<String, TopicDescription> topicDescriptions =
admin.describeTopics(topics).all().get();
+ Map<String, TopicDescription> topicDescriptions =
admin.describeTopics(topics).allTopicNames().get();
topicDescriptions.forEach((topic, description) -> {
description.partitions().forEach(partitionInfo -> {
if (!brokerId.isPresent() ||
hasReplica(brokerId.get(), partitionInfo)) {
diff --git
a/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
index 968c34a..98704e5 100644
--- a/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
@@ -540,7 +540,7 @@ public class TransactionsCommandTest {
Map<String, TopicDescription> descriptions
) {
DescribeTopicsResult result = Mockito.mock(DescribeTopicsResult.class);
- Mockito.when(result.all()).thenReturn(completedFuture(descriptions));
+
Mockito.when(result.allTopicNames()).thenReturn(completedFuture(descriptions));
Mockito.when(admin.describeTopics(new
ArrayList<>(descriptions.keySet()))).thenReturn(result);
}
diff --git
a/trogdor/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
b/trogdor/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
index 812129e..23c0ba4 100644
--- a/trogdor/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
+++ b/trogdor/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
@@ -279,7 +279,7 @@ public final class WorkerUtils {
try {
DescribeTopicsResult topicsResult = adminClient.describeTopics(
topicsToVerify, new
DescribeTopicsOptions().timeoutMs(ADMIN_REQUEST_TIMEOUT));
- return topicsResult.all().get();
+ return topicsResult.allTopicNames().get();
} catch (ExecutionException exception) {
if (exception.getCause() instanceof
UnknownTopicOrPartitionException) {
lastException = (UnknownTopicOrPartitionException)
exception.getCause();
@@ -321,7 +321,7 @@ public final class WorkerUtils {
List<TopicPartition> out = new ArrayList<>();
DescribeTopicsResult topicsResult = adminClient.describeTopics(
matchedTopics, new
DescribeTopicsOptions().timeoutMs(ADMIN_REQUEST_TIMEOUT));
- Map<String, TopicDescription> topicDescriptionMap =
topicsResult.all().get();
+ Map<String, TopicDescription> topicDescriptionMap =
topicsResult.allTopicNames().get();
for (TopicDescription desc: topicDescriptionMap.values()) {
List<TopicPartitionInfo> partitions = desc.partitions();
for (TopicPartitionInfo info: partitions) {
diff --git
a/trogdor/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java
b/trogdor/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java
index 738ec3c..a5fbc85 100644
--- a/trogdor/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java
+++ b/trogdor/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java
@@ -81,7 +81,7 @@ public class WorkerUtilsTest {
Collections.singletonList(
new TopicPartitionInfo(0, broker1, singleReplica,
Collections.<Node>emptyList()))),
adminClient.describeTopics(
-
Collections.singleton(TEST_TOPIC)).values().get(TEST_TOPIC).get()
+
Collections.singleton(TEST_TOPIC)).topicNameValues().get(TEST_TOPIC).get()
);
}
@@ -98,7 +98,7 @@ public class WorkerUtilsTest {
Collections.singletonList(
new TopicPartitionInfo(0, broker1, singleReplica,
Collections.<Node>emptyList()))),
adminClient.describeTopics(
-
Collections.singleton(TEST_TOPIC)).values().get(TEST_TOPIC).get()
+
Collections.singleton(TEST_TOPIC)).topicNameValues().get(TEST_TOPIC).get()
);
}
@@ -177,7 +177,7 @@ public class WorkerUtilsTest {
TEST_TOPIC, false,
Collections.singletonList(
new TopicPartitionInfo(0, broker1, singleReplica,
Collections.<Node>emptyList()))),
-
adminClient.describeTopics(Collections.singleton(TEST_TOPIC)).values().get(TEST_TOPIC).get()
+
adminClient.describeTopics(Collections.singleton(TEST_TOPIC)).topicNameValues().get(TEST_TOPIC).get()
);
}