This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1d22b0d  KAFKA-10774; Admin API for Describe topic using topic IDs 
(#9769)
1d22b0d is described below

commit 1d22b0d70686aef5689b775ea2ea7610a37f3e8c
Author: dengziming <[email protected]>
AuthorDate: Sat Aug 28 16:00:36 2021 +0800

    KAFKA-10774; Admin API for Describe topic using topic IDs (#9769)
    
    Reviewers: Justine Olshan <[email protected]>, Chia-Ping Tsai 
<[email protected]>, Satish Duggana <[email protected]>, Rajini Sivaram 
<[email protected]>
---
 .../java/org/apache/kafka/clients/admin/Admin.java |  44 +++++--
 .../kafka/clients/admin/DeleteTopicsResult.java    |   2 +-
 .../kafka/clients/admin/DescribeTopicsResult.java  |  97 +++++++++++++--
 .../kafka/clients/admin/KafkaAdminClient.java      | 127 ++++++++++++++++----
 .../kafka/clients/admin/TopicDescription.java      |  12 +-
 .../apache/kafka/clients/admin/TopicListing.java   |  28 ++++-
 .../main/java/org/apache/kafka/common/Cluster.java |   8 ++
 .../kafka/common/requests/MetadataRequest.java     |  40 ++++++-
 .../kafka/common/requests/MetadataResponse.java    |  19 +++
 .../resources/common/message/MetadataRequest.json  |   3 +-
 .../resources/common/message/MetadataResponse.json |   5 +-
 .../kafka/clients/admin/AdminClientTestUtils.java  |   7 +-
 .../kafka/clients/admin/KafkaAdminClientTest.java  |   4 +-
 .../kafka/clients/admin/MockAdminClient.java       | 100 +++++++++++++---
 .../ClientAuthenticationFailureTest.java           |   2 +-
 .../connect/mirror/MirrorSourceConnector.java      |   2 +-
 .../org/apache/kafka/connect/util/TopicAdmin.java  |   2 +-
 .../apache/kafka/connect/util/TopicAdminTest.java  |   2 +-
 .../util/clusters/EmbeddedKafkaCluster.java        |   2 +-
 .../scala/kafka/admin/ConsumerGroupCommand.scala   |   4 +-
 .../kafka/admin/ReassignPartitionsCommand.scala    |   4 +-
 core/src/main/scala/kafka/admin/TopicCommand.scala | 133 +++++++++++++++------
 core/src/main/scala/kafka/server/KafkaApis.scala   |  36 +++++-
 .../main/scala/kafka/server/MetadataCache.scala    |   4 +
 .../kafka/server/metadata/KRaftMetadataCache.scala |   4 +
 .../kafka/server/metadata/ZkMetadataCache.scala    |   8 ++
 .../kafka/tools/ReplicaVerificationTool.scala      |   2 +-
 .../kafka/api/BaseAdminIntegrationTest.scala       |   4 +-
 .../api/DescribeAuthorizedOperationsTest.scala     |   6 +-
 .../kafka/api/EndToEndAuthorizationTest.scala      |   8 +-
 .../kafka/api/PlaintextAdminIntegrationTest.scala  |  24 +++-
 .../SaslClientsWithInvalidCredentialsTest.scala    |   2 +-
 .../server/DynamicBrokerReconfigurationTest.scala  |   2 +-
 .../kafka/server/KRaftClusterTest.scala            |   2 +-
 .../MetadataRequestBetweenDifferentIbpTest.scala   |  96 +++++++++++++++
 .../scala/unit/kafka/admin/DeleteTopicTest.scala   |   2 +-
 .../kafka/admin/TopicCommandIntegrationTest.scala  |  18 +--
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 111 +++++++++++++++++
 .../test/scala/unit/kafka/utils/TestUtils.scala    |   8 +-
 .../TopicBasedRemoteLogMetadataManager.java        |   2 +-
 .../processor/internals/InternalTopicManager.java  |   4 +-
 .../KStreamRepartitionIntegrationTest.java         |   2 +-
 ...bleJoinTopologyOptimizationIntegrationTest.java |   2 +-
 .../internals/InternalTopicManagerTest.java        |   8 +-
 .../kafka/tools/ClientCompatibilityTest.java       |   2 +-
 .../apache/kafka/tools/TransactionsCommand.java    |   2 +-
 .../kafka/tools/TransactionsCommandTest.java       |   2 +-
 .../apache/kafka/trogdor/common/WorkerUtils.java   |   4 +-
 .../kafka/trogdor/common/WorkerUtilsTest.java      |   6 +-
 49 files changed, 846 insertions(+), 172 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index 54d103b..4b6fe49 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -17,14 +17,6 @@
 
 package org.apache.kafka.clients.admin;
 
-import java.time.Duration;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.Set;
-
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.ElectionType;
 import org.apache.kafka.common.KafkaFuture;
@@ -42,6 +34,14 @@ import org.apache.kafka.common.quota.ClientQuotaAlteration;
 import org.apache.kafka.common.quota.ClientQuotaFilter;
 import org.apache.kafka.common.requests.LeaveGroupResponse;
 
+import java.time.Duration;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+
 /**
  * The administrative client for Kafka, which supports managing and inspecting 
topics, brokers, configurations and ACLs.
  * <p>
@@ -303,7 +303,33 @@ public interface Admin extends AutoCloseable {
      * @param options    The options to use when describing the topic.
      * @return The DescribeTopicsResult.
      */
-    DescribeTopicsResult describeTopics(Collection<String> topicNames, 
DescribeTopicsOptions options);
+    default DescribeTopicsResult describeTopics(Collection<String> topicNames, 
DescribeTopicsOptions options) {
+        return describeTopics(TopicCollection.ofTopicNames(topicNames), 
options);
+    }
+
+    /**
+     * This is a convenience method for {@link 
#describeTopics(TopicCollection, DescribeTopicsOptions)}
+     * with default options. See the overload for more details.
+     * <p>
+     * When using topic IDs, this operation is supported by brokers with 
version 3.1.0 or higher.
+     *
+     * @param topics The topics to describe.
+     * @return The DescribeTopicsResult.
+     */
+    default DescribeTopicsResult describeTopics(TopicCollection topics) {
+        return describeTopics(topics, new DescribeTopicsOptions());
+    }
+
+    /**
+     * Describe some topics in the cluster.
+     *
+     * When using topic IDs, this operation is supported by brokers with 
version 3.1.0 or higher.
+     *
+     * @param topics  The topics to describe.
+     * @param options The options to use when describing the topics.
+     * @return The DescribeTopicsResult.
+     */
+    DescribeTopicsResult describeTopics(TopicCollection topics, 
DescribeTopicsOptions options);
 
     /**
      * Get information about the nodes in the cluster, using the default 
options.
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java
index b0d23d9..725b82a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java
@@ -70,12 +70,12 @@ public class DeleteTopicsResult {
         return nameFutures;
     }
 
-    @Deprecated
     /**
      * @return a map from topic names to futures which can be used to check 
the status of
      * individual deletions if the deleteTopics request used topic names. 
Otherwise return null.
      * @deprecated Since 3.0 use {@link #topicNameValues} instead
      */
+    @Deprecated
     public Map<String, KafkaFuture<Void>> values() {
         return nameFutures;
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
index 7753984..41593c5 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java
@@ -18,6 +18,8 @@
 package org.apache.kafka.clients.admin;
 
 import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicCollection;
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.annotation.InterfaceStability;
 
 import java.util.Collection;
@@ -32,28 +34,105 @@ import java.util.concurrent.ExecutionException;
  */
 @InterfaceStability.Evolving
 public class DescribeTopicsResult {
-    private final Map<String, KafkaFuture<TopicDescription>> futures;
+    private final Map<Uuid, KafkaFuture<TopicDescription>> topicIdFutures;
+    private final Map<String, KafkaFuture<TopicDescription>> nameFutures;
 
+    @Deprecated
     protected DescribeTopicsResult(Map<String, KafkaFuture<TopicDescription>> 
futures) {
-        this.futures = futures;
+        this(null, futures);
+    }
+
+    // VisibleForTesting
+    protected DescribeTopicsResult(Map<Uuid, KafkaFuture<TopicDescription>> 
topicIdFutures, Map<String, KafkaFuture<TopicDescription>> nameFutures) {
+        if (topicIdFutures != null && nameFutures != null)
+            throw new IllegalArgumentException("topicIdFutures and nameFutures 
cannot both be specified.");
+        if (topicIdFutures == null && nameFutures == null)
+            throw new IllegalArgumentException("topicIdFutures and nameFutures 
cannot both be null.");
+        this.topicIdFutures = topicIdFutures;
+        this.nameFutures = nameFutures;
+    }
+
+    static DescribeTopicsResult ofTopicIds(Map<Uuid, 
KafkaFuture<TopicDescription>> topicIdFutures) {
+        return new DescribeTopicsResult(topicIdFutures, null);
+    }
+
+    static DescribeTopicsResult ofTopicNames(Map<String, 
KafkaFuture<TopicDescription>> nameFutures) {
+        return new DescribeTopicsResult(null, nameFutures);
+    }
+
+    /**
+     * Use when {@link Admin#describeTopics(TopicCollection, 
DescribeTopicsOptions)} used a TopicIdCollection
+     *
+     * @return a map from topic IDs to futures which can be used to check the 
status of
+     *         individual topics if the request used topic IDs, otherwise 
return null.
+     */
+    public Map<Uuid, KafkaFuture<TopicDescription>> topicIdValues() {
+        return topicIdFutures;
+    }
+
+    /**
+     * Use when {@link Admin#describeTopics(TopicCollection, 
DescribeTopicsOptions)} used a TopicNameCollection
+     *
+     * @return a map from topic names to futures which can be used to check 
the status of
+     *         individual topics if the request used topic names, otherwise 
return null.
+     */
+    public Map<String, KafkaFuture<TopicDescription>> topicNameValues() {
+        return nameFutures;
     }
 
     /**
-     * Return a map from topic names to futures which can be used to check the 
status of
-     * individual topics.
+     * @return a map from topic names to futures which can be used to check 
the status of
+     *         individual topics if the request used topic names, otherwise 
return null.
+     *
+     * @deprecated Since 3.1.0 use {@link #topicNameValues} instead
      */
+    @Deprecated
     public Map<String, KafkaFuture<TopicDescription>> values() {
-        return futures;
+        return nameFutures;
     }
 
     /**
-     * Return a future which succeeds only if all the topic descriptions 
succeed.
+     * @return A future map from topic names to descriptions which can be used 
to check
+     *         the status of individual description if the describe topic 
request used
+     *         topic names, otherwise return null, this request succeeds only 
if all the
+     *         topic descriptions succeed
+     *
+     * @deprecated Since 3.1.0 use {@link #allTopicNames()} instead
      */
+    @Deprecated
     public KafkaFuture<Map<String, TopicDescription>> all() {
-        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).
+        return all(nameFutures);
+    }
+
+    /**
+     * @return A future map from topic names to descriptions which can be used 
to check
+     *         the status of individual description if the describe topic 
request used
+     *         topic names, otherwise return null, this request succeeds only 
if all the
+     *         topic descriptions succeed
+     */
+    public KafkaFuture<Map<String, TopicDescription>> allTopicNames() {
+        return all(nameFutures);
+    }
+
+    /**
+     * @return A future map from topic ids to descriptions which can be used 
to check the
+     *         status of individual description if the describe topic request 
used topic
+     *         ids, otherwise return null, this request succeeds only if all 
the topic
+     *         descriptions succeed
+     */
+    public KafkaFuture<Map<Uuid, TopicDescription>> allTopicIds() {
+        return all(topicIdFutures);
+    }
+
+    /**
+     * Return a future which succeeds only if all the topic descriptions 
succeed.
+     */
+    private static <T> KafkaFuture<Map<T, TopicDescription>> all(Map<T, 
KafkaFuture<TopicDescription>> futures) {
+        KafkaFuture<Void> future = 
KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
+        return future.
             thenApply(v -> {
-                Map<String, TopicDescription> descriptions = new 
HashMap<>(futures.size());
-                for (Map.Entry<String, KafkaFuture<TopicDescription>> entry : 
futures.entrySet()) {
+                Map<T, TopicDescription> descriptions = new 
HashMap<>(futures.size());
+                for (Map.Entry<T, KafkaFuture<TopicDescription>> entry : 
futures.entrySet()) {
                     try {
                         descriptions.put(entry.getKey(), 
entry.getValue().get());
                     } catch (InterruptedException | ExecutionException e) {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 981598f..4fae1cc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -271,6 +271,7 @@ import static 
org.apache.kafka.common.message.ListPartitionReassignmentsRequestD
 import static 
org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingPartitionReassignment;
 import static 
org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment;
 import static 
org.apache.kafka.common.requests.MetadataRequest.convertToMetadataRequestTopic;
+import static 
org.apache.kafka.common.requests.MetadataRequest.convertTopicIdsToMetadataRequestTopic;
 import static org.apache.kafka.common.utils.Utils.closeQuietly;
 
 /**
@@ -1497,6 +1498,10 @@ public class KafkaAdminClient extends AdminClient {
         return topicName == null || topicName.isEmpty();
     }
 
+    private static boolean topicIdIsUnrepresentable(Uuid topicId) {
+        return topicId == null || topicId == Uuid.ZERO_UUID;
+    }
+
     // for testing
     int numPendingCalls() {
         return runnable.pendingCalls.size();
@@ -1884,7 +1889,7 @@ public class KafkaAdminClient extends AdminClient {
                     String topicName = topicMetadata.topic();
                     boolean isInternal = topicMetadata.isInternal();
                     if (!topicMetadata.isInternal() || 
options.shouldListInternal())
-                        topicListing.put(topicName, new 
TopicListing(topicName, isInternal));
+                        topicListing.put(topicName, new 
TopicListing(topicName, topicMetadata.topicId(), isInternal));
                 }
                 topicListingFuture.complete(topicListing);
             }
@@ -1898,7 +1903,16 @@ public class KafkaAdminClient extends AdminClient {
     }
 
     @Override
-    public DescribeTopicsResult describeTopics(final Collection<String> 
topicNames, DescribeTopicsOptions options) {
+    public DescribeTopicsResult describeTopics(final TopicCollection topics, 
DescribeTopicsOptions options) {
+        if (topics instanceof TopicIdCollection)
+            return 
DescribeTopicsResult.ofTopicIds(handleDescribeTopicsByIds(((TopicIdCollection) 
topics).topicIds(), options));
+        else if (topics instanceof TopicNameCollection)
+            return 
DescribeTopicsResult.ofTopicNames(handleDescribeTopicsByNames(((TopicNameCollection)
 topics).topicNames(), options));
+        else
+            throw new IllegalArgumentException("The TopicCollection: " + 
topics + " provided did not match any supported classes for describeTopics.");
+    }
+
+    private Map<String, KafkaFuture<TopicDescription>> 
handleDescribeTopicsByNames(final Collection<String> topicNames, 
DescribeTopicsOptions options) {
         final Map<String, KafkaFutureImpl<TopicDescription>> topicFutures = 
new HashMap<>(topicNames.size());
         final ArrayList<String> topicNamesList = new ArrayList<>();
         for (String topicName : topicNames) {
@@ -1947,28 +1961,13 @@ public class KafkaAdminClient extends AdminClient {
                         future.completeExceptionally(new 
UnknownTopicOrPartitionException("Topic " + topicName + " not found."));
                         continue;
                     }
-                    boolean isInternal = 
cluster.internalTopics().contains(topicName);
-                    List<PartitionInfo> partitionInfos = 
cluster.partitionsForTopic(topicName);
-                    List<TopicPartitionInfo> partitions = new 
ArrayList<>(partitionInfos.size());
-                    for (PartitionInfo partitionInfo : partitionInfos) {
-                        TopicPartitionInfo topicPartitionInfo = new 
TopicPartitionInfo(
-                            partitionInfo.partition(), leader(partitionInfo), 
Arrays.asList(partitionInfo.replicas()),
-                            Arrays.asList(partitionInfo.inSyncReplicas()));
-                        partitions.add(topicPartitionInfo);
-                    }
-                    
partitions.sort(Comparator.comparingInt(TopicPartitionInfo::partition));
-                    TopicDescription topicDescription = new 
TopicDescription(topicName, isInternal, partitions,
-                        
validAclOperations(response.topicAuthorizedOperations(topicName).get()), 
cluster.topicId(topicName));
+                    Uuid topicId = cluster.topicId(topicName);
+                    Integer authorizedOperations = 
response.topicAuthorizedOperations(topicName).get();
+                    TopicDescription topicDescription = 
getTopicDescriptionFromCluster(cluster, topicName, topicId, 
authorizedOperations);
                     future.complete(topicDescription);
                 }
             }
 
-            private Node leader(PartitionInfo partitionInfo) {
-                if (partitionInfo.leader() == null || 
partitionInfo.leader().id() == Node.noNode().id())
-                    return null;
-                return partitionInfo.leader();
-            }
-
             @Override
             boolean 
handleUnsupportedVersionException(UnsupportedVersionException exception) {
                 if (supportsDisablingTopicCreation) {
@@ -1986,7 +1985,93 @@ public class KafkaAdminClient extends AdminClient {
         if (!topicNamesList.isEmpty()) {
             runnable.call(call, now);
         }
-        return new DescribeTopicsResult(new HashMap<>(topicFutures));
+        return new HashMap<>(topicFutures);
+    }
+
+    private Map<Uuid, KafkaFuture<TopicDescription>> 
handleDescribeTopicsByIds(Collection<Uuid> topicIds, DescribeTopicsOptions 
options) {
+
+        final Map<Uuid, KafkaFutureImpl<TopicDescription>> topicFutures = new 
HashMap<>(topicIds.size());
+        final List<Uuid> topicIdsList = new ArrayList<>();
+        for (Uuid topicId : topicIds) {
+            if (topicIdIsUnrepresentable(topicId)) {
+                KafkaFutureImpl<TopicDescription> future = new 
KafkaFutureImpl<>();
+                future.completeExceptionally(new InvalidTopicException("The 
given topic id '" +
+                        topicId + "' cannot be represented in a request."));
+                topicFutures.put(topicId, future);
+            } else if (!topicFutures.containsKey(topicId)) {
+                topicFutures.put(topicId, new KafkaFutureImpl<>());
+                topicIdsList.add(topicId);
+            }
+        }
+        final long now = time.milliseconds();
+        Call call = new Call("describeTopicsWithIds", calcDeadlineMs(now, 
options.timeoutMs()),
+                new LeastLoadedNodeProvider()) {
+
+            @Override
+            MetadataRequest.Builder createRequest(int timeoutMs) {
+                return new MetadataRequest.Builder(new MetadataRequestData()
+                        
.setTopics(convertTopicIdsToMetadataRequestTopic(topicIdsList))
+                        .setAllowAutoTopicCreation(false)
+                        
.setIncludeTopicAuthorizedOperations(options.includeAuthorizedOperations()));
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                MetadataResponse response = (MetadataResponse) 
abstractResponse;
+                // Handle server responses for particular topics.
+                Cluster cluster = response.buildCluster();
+                Map<Uuid, Errors> errors = response.errorsByTopicId();
+                for (Map.Entry<Uuid, KafkaFutureImpl<TopicDescription>> entry 
: topicFutures.entrySet()) {
+                    Uuid topicId = entry.getKey();
+                    KafkaFutureImpl<TopicDescription> future = 
entry.getValue();
+
+                    String topicName = cluster.topicName(topicId);
+                    if (topicName == null) {
+                        future.completeExceptionally(new 
InvalidTopicException("TopicId " + topicId + " not found."));
+                        continue;
+                    }
+                    Errors topicError = errors.get(topicId);
+                    if (topicError != null) {
+                        future.completeExceptionally(topicError.exception());
+                        continue;
+                    }
+
+                    Integer authorizedOperations = 
response.topicAuthorizedOperations(topicName).get();
+                    TopicDescription topicDescription = 
getTopicDescriptionFromCluster(cluster, topicName, topicId, 
authorizedOperations);
+                    future.complete(topicDescription);
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                completeAllExceptionally(topicFutures.values(), throwable);
+            }
+        };
+        if (!topicIdsList.isEmpty()) {
+            runnable.call(call, now);
+        }
+        return new HashMap<>(topicFutures);
+    }
+
+    private TopicDescription getTopicDescriptionFromCluster(Cluster cluster, 
String topicName, Uuid topicId,
+                                                            Integer 
authorizedOperations) {
+        boolean isInternal = cluster.internalTopics().contains(topicName);
+        List<PartitionInfo> partitionInfos = 
cluster.partitionsForTopic(topicName);
+        List<TopicPartitionInfo> partitions = new 
ArrayList<>(partitionInfos.size());
+        for (PartitionInfo partitionInfo : partitionInfos) {
+            TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(
+                    partitionInfo.partition(), leader(partitionInfo), 
Arrays.asList(partitionInfo.replicas()),
+                    Arrays.asList(partitionInfo.inSyncReplicas()));
+            partitions.add(topicPartitionInfo);
+        }
+        
partitions.sort(Comparator.comparingInt(TopicPartitionInfo::partition));
+        return new TopicDescription(topicName, isInternal, partitions, 
validAclOperations(authorizedOperations), topicId);
+    }
+
+    private Node leader(PartitionInfo partitionInfo) {
+        if (partitionInfo.leader() == null || partitionInfo.leader().id() == 
Node.noNode().id())
+            return null;
+        return partitionInfo.leader();
     }
 
     @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
index a2d52e9..e8700d4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java
@@ -72,13 +72,23 @@ public class TopicDescription {
      * @param internal Whether the topic is internal to Kafka
      * @param partitions A list of partitions where the index represents the 
partition id and the element contains
      *                   leadership and replica information for that partition.
-     * @param authorizedOperations authorized operations for this topic, or 
null if this is not known.
+     * @param authorizedOperations authorized operations for this topic, or 
empty set if this is not known.
      */
     public TopicDescription(String name, boolean internal, 
List<TopicPartitionInfo> partitions,
                             Set<AclOperation> authorizedOperations) {
         this(name, internal, partitions, authorizedOperations, Uuid.ZERO_UUID);
     }
 
+    /**
+     * Create an instance with the specified parameters.
+     *
+     * @param name The topic name
+     * @param internal Whether the topic is internal to Kafka
+     * @param partitions A list of partitions where the index represents the 
partition id and the element contains
+     *                   leadership and replica information for that partition.
+     * @param authorizedOperations authorized operations for this topic, or 
empty set if this is not known.
+     * @param topicId the topic id
+     */
     public TopicDescription(String name, boolean internal, 
List<TopicPartitionInfo> partitions,
                             Set<AclOperation> authorizedOperations, Uuid 
topicId) {
         this.name = name;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/TopicListing.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/TopicListing.java
index e5124be..42ceeff 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/TopicListing.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/TopicListing.java
@@ -17,11 +17,14 @@
 
 package org.apache.kafka.clients.admin;
 
+import org.apache.kafka.common.Uuid;
+
 /**
  * A listing of a topic in the cluster.
  */
 public class TopicListing {
     private final String name;
+    private final Uuid topicId;
     private final boolean internal;
 
     /**
@@ -29,10 +32,33 @@ public class TopicListing {
      *
      * @param name The topic name
      * @param internal Whether the topic is internal to Kafka
+     * @deprecated Since 3.0 use {@link #TopicListing(String, Uuid, boolean)} 
instead
      */
+    @Deprecated
     public TopicListing(String name, boolean internal) {
         this.name = name;
         this.internal = internal;
+        this.topicId = Uuid.ZERO_UUID;
+    }
+
+    /**
+     * Create an instance with the specified parameters.
+     *
+     * @param name The topic name
+     * @param topicId The topic id.
+     * @param internal Whether the topic is internal to Kafka
+     */
+    public TopicListing(String name, Uuid topicId, boolean internal) {
+        this.topicId = topicId;
+        this.name = name;
+        this.internal = internal;
+    }
+
+    /**
+     * The id of the topic.
+     */
+    public Uuid topicId() {
+        return topicId;
     }
 
     /**
@@ -52,6 +78,6 @@ public class TopicListing {
 
     @Override
     public String toString() {
-        return "(name=" + name + ", internal=" + internal + ")";
+        return "(name=" + name + ", topicId=" + topicId +  ", internal=" + 
internal + ")";
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java 
b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index 8c9bbcb..7d3f6f0 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -47,6 +47,7 @@ public final class Cluster {
     private final Map<Integer, Node> nodesById;
     private final ClusterResource clusterResource;
     private final Map<String, Uuid> topicIds;
+    private final Map<Uuid, String> topicNames;
 
     /**
      * Create a new cluster with the given id, nodes and partitions
@@ -184,6 +185,9 @@ public final class Cluster {
         this.availablePartitionsByTopic = 
Collections.unmodifiableMap(tmpAvailablePartitionsByTopic);
         this.partitionsByNode = 
Collections.unmodifiableMap(tmpPartitionsByNode);
         this.topicIds = Collections.unmodifiableMap(topicIds);
+        Map<Uuid, String> tmpTopicNames = new HashMap<>();
+        topicIds.forEach((key, value) -> tmpTopicNames.put(value, key));
+        this.topicNames = Collections.unmodifiableMap(tmpTopicNames);
 
         this.unauthorizedTopics = 
Collections.unmodifiableSet(unauthorizedTopics);
         this.invalidTopics = Collections.unmodifiableSet(invalidTopics);
@@ -354,6 +358,10 @@ public final class Cluster {
         return topicIds.getOrDefault(topic, Uuid.ZERO_UUID);
     }
 
+    public String topicName(Uuid topicId) {
+        return topicNames.get(topicId);
+    }
+
     @Override
     public String toString() {
         return "Cluster(id = " + clusterResource.clusterId() + ", nodes = " + 
this.nodes +
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
index d38e9ac..aab5fc6 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
@@ -65,6 +65,20 @@ public class MetadataRequest extends AbstractRequest {
             this(topics, allowAutoTopicCreation, 
ApiKeys.METADATA.oldestVersion(),  ApiKeys.METADATA.latestVersion());
         }
 
+        public Builder(List<Uuid> topicIds) {
+            super(ApiKeys.METADATA, ApiKeys.METADATA.oldestVersion(), 
ApiKeys.METADATA.latestVersion());
+            MetadataRequestData data = new MetadataRequestData();
+            if (topicIds == null)
+                data.setTopics(null);
+            else {
+                topicIds.forEach(topicId -> data.topics().add(new 
MetadataRequestTopic().setTopicId(topicId)));
+            }
+
+            // It's impossible to create topic with topicId
+            data.setAllowAutoTopicCreation(false);
+            this.data = data;
+        }
+
         public static Builder allTopics() {
             // This never causes auto-creation, but we set the boolean to true 
because that is the default value when
             // deserializing V2 and older. This way, the value is consistent 
after serialization and deserialization.
@@ -95,10 +109,10 @@ public class MetadataRequest extends AbstractRequest {
                         "allowAutoTopicCreation field");
             if (data.topics() != null) {
                 data.topics().forEach(topic -> {
-                    if (topic.name() == null)
+                    if (topic.name() == null && version < 12)
                         throw new UnsupportedVersionException("MetadataRequest 
version " + version +
                                 " does not support null topic names.");
-                    if (topic.topicId() != Uuid.ZERO_UUID)
+                    if (topic.topicId() != Uuid.ZERO_UUID && version < 12)
                         throw new UnsupportedVersionException("MetadataRequest 
version " + version +
                             " does not support non-zero topic IDs.");
                 });
@@ -147,12 +161,12 @@ public class MetadataRequest extends AbstractRequest {
 
     public boolean isAllTopics() {
         return (data.topics() == null) ||
-            (data.topics().isEmpty() && version() == 0); //In version 0, an 
empty topic list indicates
+            (data.topics().isEmpty() && version() == 0); // In version 0, an 
empty topic list indicates
                                                          // "request metadata 
for all topics."
     }
 
     public List<String> topics() {
-        if (isAllTopics()) //In version 0, we return null for empty topic list
+        if (isAllTopics()) // In version 0, we return null for empty topic list
             return null;
         else
             return data.topics()
@@ -161,6 +175,18 @@ public class MetadataRequest extends AbstractRequest {
                 .collect(Collectors.toList());
     }
 
+    public List<Uuid> topicIds() {
+        if (isAllTopics())
+            return Collections.emptyList();
+        else if (version() < 10)
+            return Collections.emptyList();
+        else
+            return data.topics()
+                    .stream()
+                    .map(MetadataRequestTopic::topicId)
+                    .collect(Collectors.toList());
+    }
+
     public boolean allowAutoTopicCreation() {
         return data.allowAutoTopicCreation();
     }
@@ -174,4 +200,10 @@ public class MetadataRequest extends AbstractRequest {
             .setName(topic))
             .collect(Collectors.toList());
     }
+
+    public static List<MetadataRequestTopic> 
convertTopicIdsToMetadataRequestTopic(final Collection<Uuid> topicIds) {
+        return topicIds.stream().map(topicId -> new MetadataRequestTopic()
+                .setTopicId(topicId))
+                .collect(Collectors.toList());
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index c85b31d..d539fa8 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -91,12 +91,31 @@ public class MetadataResponse extends AbstractResponse {
     public Map<String, Errors> errors() {
         Map<String, Errors> errors = new HashMap<>();
         for (MetadataResponseTopic metadata : data.topics()) {
+            if (metadata.name() == null) {
+                throw new IllegalStateException("Use errorsByTopicId() when 
managing topic using topic id");
+            }
             if (metadata.errorCode() != Errors.NONE.code())
                 errors.put(metadata.name(), 
Errors.forCode(metadata.errorCode()));
         }
         return errors;
     }
 
+    /**
+     * Get a map of the topicIds which had metadata errors
+     * @return the map
+     */
+    public Map<Uuid, Errors> errorsByTopicId() {
+        Map<Uuid, Errors> errors = new HashMap<>();
+        for (MetadataResponseTopic metadata : data.topics()) {
+            if (metadata.topicId() == Uuid.ZERO_UUID) {
+                throw new IllegalStateException("Use errors() when managing 
topic using topic name");
+            }
+            if (metadata.errorCode() != Errors.NONE.code())
+                errors.put(metadata.topicId(), 
Errors.forCode(metadata.errorCode()));
+        }
+        return errors;
+    }
+
     @Override
     public Map<Errors, Integer> errorCounts() {
         Map<Errors, Integer> errorCounts = new HashMap<>();
diff --git a/clients/src/main/resources/common/message/MetadataRequest.json 
b/clients/src/main/resources/common/message/MetadataRequest.json
index 2d88a0d..5da95cf 100644
--- a/clients/src/main/resources/common/message/MetadataRequest.json
+++ b/clients/src/main/resources/common/message/MetadataRequest.json
@@ -18,7 +18,7 @@
   "type": "request",
   "listeners": ["zkBroker", "broker"],
   "name": "MetadataRequest",
-  "validVersions": "0-11",
+  "validVersions": "0-12",
   "flexibleVersions": "9+",
   "fields": [
     // In version 0, an empty array indicates "request metadata for all 
topics."  In version 1 and
@@ -38,6 +38,7 @@
     //
     // Version 11 deprecates IncludeClusterAuthorizedOperations field. This is 
now exposed
     // by the DescribeCluster API (KIP-700).
+    // Version 12 supports topic Id.
     { "name": "Topics", "type": "[]MetadataRequestTopic", "versions": "0+", 
"nullableVersions": "1+",
       "about": "The topics to fetch metadata for.", "fields": [
       { "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable": 
true, "about": "The topic id." },
diff --git a/clients/src/main/resources/common/message/MetadataResponse.json 
b/clients/src/main/resources/common/message/MetadataResponse.json
index d005294..714b28b 100644
--- a/clients/src/main/resources/common/message/MetadataResponse.json
+++ b/clients/src/main/resources/common/message/MetadataResponse.json
@@ -41,7 +41,8 @@
   //
   // Version 11 deprecates ClusterAuthorizedOperations. This is now exposed
   // by the DescribeCluster API (KIP-700).
-  "validVersions": "0-11",
+  // Version 12 supports topicId.
+  "validVersions": "0-12",
   "flexibleVersions": "9+",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", 
"ignorable": true,
@@ -65,7 +66,7 @@
       "about": "Each topic in the response.", "fields": [
       { "name": "ErrorCode", "type": "int16", "versions": "0+",
         "about": "The topic error, or 0 if there was no error." },
-      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, 
"entityType": "topicName",
+      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, 
"entityType": "topicName", "nullableVersions": "12+",
         "about": "The topic name." },
       { "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable": 
true, "about": "The topic id." },
       { "name": "IsInternal", "type": "bool", "versions": "1+", "default": 
"false", "ignorable": true,
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
index 019566a..3db5739 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
@@ -26,6 +26,7 @@ import 
org.apache.kafka.clients.admin.internals.MetadataOperationContext;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 
 public class AdminClientTestUtils {
@@ -70,7 +71,7 @@ public class AdminClientTestUtils {
      */
     public static ListTopicsResult listTopicsResult(String topic) {
         KafkaFutureImpl<Map<String, TopicListing>> future = new 
KafkaFutureImpl<>();
-        future.complete(Collections.singletonMap(topic, new 
TopicListing(topic, false)));
+        future.complete(Collections.singletonMap(topic, new 
TopicListing(topic, Uuid.ZERO_UUID, false)));
         return new ListTopicsResult(future);
     }
 
@@ -93,11 +94,11 @@ public class AdminClientTestUtils {
     public static DescribeTopicsResult describeTopicsResult(String topic, 
TopicDescription description) {
         KafkaFutureImpl<TopicDescription> future = new KafkaFutureImpl<>();
         future.complete(description);
-        return new DescribeTopicsResult(Collections.singletonMap(topic, 
future));
+        return 
DescribeTopicsResult.ofTopicNames(Collections.singletonMap(topic, future));
     }
 
     public static DescribeTopicsResult describeTopicsResult(Map<String, 
TopicDescription> topicDescriptions) {
-        return new DescribeTopicsResult(topicDescriptions.entrySet().stream()
+        return 
DescribeTopicsResult.ofTopicNames(topicDescriptions.entrySet().stream()
                 .collect(Collectors.toMap(Map.Entry::getKey, e -> 
KafkaFuture.completedFuture(e.getValue()))));
     }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index f1ace85..46542db 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -1203,7 +1203,7 @@ public class KafkaAdminClientTest {
             assertEquals(0, env.kafkaClient().inFlightRequestCount());
 
             Map<String, KafkaFuture<TopicDescription>> describeFutures =
-                    env.adminClient().describeTopics(sillyTopicNames).values();
+                    
env.adminClient().describeTopics(sillyTopicNames).topicNameValues();
             for (String sillyTopicName : sillyTopicNames) {
                 
TestUtils.assertFutureError(describeFutures.get(sillyTopicName), 
InvalidTopicException.class);
             }
@@ -1255,7 +1255,7 @@ public class KafkaAdminClientTest {
                             singletonList(partitionMetadata), 
MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED))));
 
             DescribeTopicsResult result = 
env.adminClient().describeTopics(singleton(topic));
-            Map<String, TopicDescription> topicDescriptions = 
result.all().get();
+            Map<String, TopicDescription> topicDescriptions = 
result.allTopicNames().get();
             assertEquals(leader, 
topicDescriptions.get(topic).partitions().get(0).leader());
             assertNull(topicDescriptions.get(topic).authorizedOperations());
         }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java 
b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index 2ef4a26..473edae 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -40,6 +40,7 @@ import org.apache.kafka.common.errors.KafkaStorageException;
 import org.apache.kafka.common.errors.ReplicaNotAvailableException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.common.errors.UnknownTopicIdException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
@@ -199,10 +200,18 @@ public class MockAdminClient extends AdminClient {
         this.controller = controller;
     }
 
-    synchronized public void addTopic(boolean internal,
+    public void addTopic(boolean internal,
                          String name,
                          List<TopicPartitionInfo> partitions,
                          Map<String, String> configs) {
+        addTopic(internal, name, partitions, configs, true);
+    }
+
+    synchronized public void addTopic(boolean internal,
+                                      String name,
+                                      List<TopicPartitionInfo> partitions,
+                                      Map<String, String> configs,
+                                      boolean usesTopicId) {
         if (allTopics.containsKey(name)) {
             throw new IllegalArgumentException(String.format("Topic %s was 
already added.", name));
         }
@@ -223,10 +232,15 @@ public class MockAdminClient extends AdminClient {
                 logDirs.add(brokerLogDirs.get(partition.leader().id()).get(0));
             }
         }
-        allTopics.put(name, new TopicMetadata(internal, partitions, logDirs, 
configs));
-        Uuid id = Uuid.randomUuid();
-        topicIds.put(name, id);
-        topicNames.put(id, name);
+        Uuid topicId;
+        if (usesTopicId) {
+            topicId = Uuid.randomUuid();
+            topicIds.put(name, topicId);
+            topicNames.put(topicId, name);
+        } else {
+            topicId = Uuid.ZERO_UUID;
+        }
+        allTopics.put(name, new TopicMetadata(topicId, internal, partitions, 
logDirs, configs));
     }
 
     synchronized public void markTopicForDeletion(final String name) {
@@ -317,10 +331,10 @@ public class MockAdminClient extends AdminClient {
                 partitions.add(new TopicPartitionInfo(i, brokers.get(0), 
replicas, Collections.emptyList()));
                 
logDirs.add(brokerLogDirs.get(partitions.get(i).leader().id()).get(0));
             }
-            allTopics.put(topicName, new TopicMetadata(false, partitions, 
logDirs, newTopic.configs()));
-            Uuid id = Uuid.randomUuid();
-            topicIds.put(topicName, id);
-            topicNames.put(id, topicName);
+            Uuid topicId = Uuid.randomUuid();
+            topicIds.put(topicName, topicId);
+            topicNames.put(topicId, topicName);
+            allTopics.put(topicName, new TopicMetadata(topicId, false, 
partitions, logDirs, newTopic.configs()));
             future.complete(null);
             createTopicResult.put(topicName, future);
         }
@@ -345,7 +359,7 @@ public class MockAdminClient extends AdminClient {
             if (topicDescription.getValue().fetchesRemainingUntilVisible > 0) {
                 topicDescription.getValue().fetchesRemainingUntilVisible--;
             } else {
-                topicListings.put(topicName, new TopicListing(topicName, 
topicDescription.getValue().isInternalTopic));
+                topicListings.put(topicName, new TopicListing(topicName, 
topicDescription.getValue().topicId, 
topicDescription.getValue().isInternalTopic));
             }
         }
 
@@ -355,7 +369,16 @@ public class MockAdminClient extends AdminClient {
     }
 
     @Override
-    synchronized public DescribeTopicsResult describeTopics(Collection<String> 
topicNames, DescribeTopicsOptions options) {
+    synchronized public DescribeTopicsResult describeTopics(TopicCollection 
topics, DescribeTopicsOptions options) {
+        if (topics instanceof TopicIdCollection)
+            return DescribeTopicsResult.ofTopicIds(new 
HashMap<>(handleDescribeTopicsUsingIds(((TopicIdCollection) topics).topicIds(), 
options)));
+        else if (topics instanceof TopicNameCollection)
+            return DescribeTopicsResult.ofTopicNames(new 
HashMap<>(handleDescribeTopicsByNames(((TopicNameCollection) 
topics).topicNames(), options)));
+        else
+            throw new IllegalArgumentException("The TopicCollection provided 
did not match any supported classes for describeTopics.");
+    }
+
+    private Map<String, KafkaFuture<TopicDescription>> 
handleDescribeTopicsByNames(Collection<String> topicNames, 
DescribeTopicsOptions options) {
         Map<String, KafkaFuture<TopicDescription>> topicDescriptions = new 
HashMap<>();
 
         if (timeoutNextRequests > 0) {
@@ -366,20 +389,20 @@ public class MockAdminClient extends AdminClient {
             }
 
             --timeoutNextRequests;
-            return new DescribeTopicsResult(topicDescriptions);
+            return topicDescriptions;
         }
 
         for (String requestedTopic : topicNames) {
             for (Map.Entry<String, TopicMetadata> topicDescription : 
allTopics.entrySet()) {
                 String topicName = topicDescription.getKey();
+                Uuid topicId = topicIds.getOrDefault(topicName, 
Uuid.ZERO_UUID);
                 if (topicName.equals(requestedTopic) && 
!topicDescription.getValue().markedForDeletion) {
                     if 
(topicDescription.getValue().fetchesRemainingUntilVisible > 0) {
                         
topicDescription.getValue().fetchesRemainingUntilVisible--;
                     } else {
                         TopicMetadata topicMetadata = 
topicDescription.getValue();
                         KafkaFutureImpl<TopicDescription> future = new 
KafkaFutureImpl<>();
-                        future.complete(new TopicDescription(topicName, 
topicMetadata.isInternalTopic, topicMetadata.partitions,
-                                Collections.emptySet()));
+                        future.complete(new TopicDescription(topicName, 
topicMetadata.isInternalTopic, topicMetadata.partitions, 
Collections.emptySet(), topicId));
                         topicDescriptions.put(topicName, future);
                         break;
                     }
@@ -392,7 +415,49 @@ public class MockAdminClient extends AdminClient {
             }
         }
 
-        return new DescribeTopicsResult(topicDescriptions);
+        return topicDescriptions;
+    }
+
+    synchronized public Map<Uuid, KafkaFuture<TopicDescription>>  
handleDescribeTopicsUsingIds(Collection<Uuid> topicIds, DescribeTopicsOptions 
options) {
+
+        Map<Uuid, KafkaFuture<TopicDescription>> topicDescriptions = new 
HashMap<>();
+
+        if (timeoutNextRequests > 0) {
+            for (Uuid requestedTopicId : topicIds) {
+                KafkaFutureImpl<TopicDescription> future = new 
KafkaFutureImpl<>();
+                future.completeExceptionally(new TimeoutException());
+                topicDescriptions.put(requestedTopicId, future);
+            }
+
+            --timeoutNextRequests;
+            return topicDescriptions;
+        }
+
+        for (Uuid requestedTopicId : topicIds) {
+            for (Map.Entry<String, TopicMetadata> topicDescription : 
allTopics.entrySet()) {
+                String topicName = topicDescription.getKey();
+                Uuid topicId = this.topicIds.get(topicName);
+
+                if (topicId != null && topicId.equals(requestedTopicId) && 
!topicDescription.getValue().markedForDeletion) {
+                    if 
(topicDescription.getValue().fetchesRemainingUntilVisible > 0) {
+                        
topicDescription.getValue().fetchesRemainingUntilVisible--;
+                    } else {
+                        TopicMetadata topicMetadata = 
topicDescription.getValue();
+                        KafkaFutureImpl<TopicDescription> future = new 
KafkaFutureImpl<>();
+                        future.complete(new TopicDescription(topicName, 
topicMetadata.isInternalTopic, topicMetadata.partitions, 
Collections.emptySet(), topicId));
+                        topicDescriptions.put(requestedTopicId, future);
+                        break;
+                    }
+                }
+            }
+            if (!topicDescriptions.containsKey(requestedTopicId)) {
+                KafkaFutureImpl<TopicDescription> future = new 
KafkaFutureImpl<>();
+                future.completeExceptionally(new 
UnknownTopicIdException("Topic id" + requestedTopicId + " not found."));
+                topicDescriptions.put(requestedTopicId, future);
+            }
+        }
+
+        return topicDescriptions;
     }
 
     @Override
@@ -948,6 +1013,7 @@ public class MockAdminClient extends AdminClient {
     }
 
     private final static class TopicMetadata {
+        final Uuid topicId;
         final boolean isInternalTopic;
         final List<TopicPartitionInfo> partitions;
         final List<String> partitionLogDirs;
@@ -956,10 +1022,12 @@ public class MockAdminClient extends AdminClient {
 
         public boolean markedForDeletion;
 
-        TopicMetadata(boolean isInternalTopic,
+        TopicMetadata(Uuid topicId,
+                      boolean isInternalTopic,
                       List<TopicPartitionInfo> partitions,
                       List<String> partitionLogDirs,
                       Map<String, String> configs) {
+            this.topicId = topicId;
             this.isInternalTopic = isInternalTopic;
             this.partitions = partitions;
             this.partitionLogDirs = partitionLogDirs;
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
index a45cf52..0a0466f 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
@@ -116,7 +116,7 @@ public class ClientAuthenticationFailureTest {
         Map<String, Object> props = new HashMap<>(saslClientConfigs);
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + 
server.port());
         try (Admin client = Admin.create(props)) {
-            KafkaFuture<Map<String, TopicDescription>> future = 
client.describeTopics(Collections.singleton("test")).all();
+            KafkaFuture<Map<String, TopicDescription>> future = 
client.describeTopics(Collections.singleton("test")).allTopicNames();
             TestUtils.assertFutureThrows(future, 
SaslAuthenticationException.class);
         }
     }
diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
index fd6bf9f..0e3ccc9 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
@@ -399,7 +399,7 @@ public class MirrorSourceConnector extends SourceConnector {
 
     private static Collection<TopicDescription> describeTopics(AdminClient 
adminClient, Collection<String> topics)
             throws InterruptedException, ExecutionException {
-        return adminClient.describeTopics(topics).all().get().values();
+        return 
adminClient.describeTopics(topics).allTopicNames().get().values();
     }
 
     static Map<String, String> configToMap(Config config) {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
index 9661c69..7b2f152 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
@@ -452,7 +452,7 @@ public class TopicAdmin implements AutoCloseable {
         String topicNameList = String.join(", ", topics);
 
         Map<String, KafkaFuture<TopicDescription>> newResults =
-                admin.describeTopics(Arrays.asList(topics), new 
DescribeTopicsOptions()).values();
+                admin.describeTopics(Arrays.asList(topics), new 
DescribeTopicsOptions()).topicNameValues();
 
         // Iterate over each future so that we can handle individual failures 
like when some topics don't exist
         Map<String, TopicDescription> existingTopics = new HashMap<>();
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
index edd9891..dc25129 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
@@ -797,7 +797,7 @@ public class TopicAdminTest {
     protected TopicDescription topicDescription(MockAdminClient admin, String 
topicName)
             throws ExecutionException, InterruptedException {
         DescribeTopicsResult result = 
admin.describeTopics(Collections.singleton(topicName));
-        Map<String, KafkaFuture<TopicDescription>> byName = result.values();
+        Map<String, KafkaFuture<TopicDescription>> byName = 
result.topicNameValues();
         return byName.get(topicName).get();
     }
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
index db6f67e..17fd1ac 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
@@ -305,7 +305,7 @@ public class EmbeddedKafkaCluster {
         log.info("Describing topics {}", topicNames);
         try (Admin admin = createAdminClient()) {
             DescribeTopicsResult result = admin.describeTopics(topicNames);
-            Map<String, KafkaFuture<TopicDescription>> byName = 
result.values();
+            Map<String, KafkaFuture<TopicDescription>> byName = 
result.topicNameValues();
             for (Map.Entry<String, KafkaFuture<TopicDescription>> entry : 
byName.entrySet()) {
                 String topicName = entry.getKey();
                 try {
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 
b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 74b7224..47c1d17 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -465,7 +465,7 @@ object ConsumerGroupCommand extends Logging {
         topicWithoutPartitions.asJava,
         withTimeoutMs(new DescribeTopicsOptions))
 
-      val unknownPartitions = describeTopicsResult.values().asScala.flatMap { 
case (topic, future) =>
+      val unknownPartitions = 
describeTopicsResult.topicNameValues().asScala.flatMap { case (topic, future) =>
         Try(future.get()) match {
           case Success(description) => description.partitions().asScala.map { 
partition =>
             new TopicPartition(topic, partition.partition())
@@ -726,7 +726,7 @@ object ConsumerGroupCommand extends Logging {
         val descriptionMap = adminClient.describeTopics(
           topics.asJava,
           withTimeoutMs(new DescribeTopicsOptions)
-        ).all().get.asScala
+        ).allTopicNames().get.asScala
         descriptionMap.flatMap { case (topic, description) =>
           description.partitions().asScala.map { tpInfo =>
             new TopicPartition(topic, tpInfo.partition)
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 
b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 186a431..ac6304b 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -376,7 +376,7 @@ object ReassignPartitionsCommand extends Logging {
         topicNamesToLookUp.add(part.topic)
     }
     val topicDescriptions = adminClient.
-      describeTopics(topicNamesToLookUp.asJava).values().asScala
+      describeTopics(topicNamesToLookUp.asJava).topicNameValues().asScala
     val notFoundResults = notFoundReassignments.map {
       case (part, targetReplicas) =>
         currentReassignments.get(part) match {
@@ -618,7 +618,7 @@ object ReassignPartitionsCommand extends Logging {
   private def describeTopics(adminClient: Admin,
                              topics: Set[String])
                              : Map[String, TopicDescription] = {
-    adminClient.describeTopics(topics.asJava).values.asScala.map { case 
(topicName, topicDescriptionFuture) =>
+    adminClient.describeTopics(topics.asJava).topicNameValues().asScala.map { 
case (topicName, topicDescriptionFuture) =>
       try topicName -> topicDescriptionFuture.get
       catch {
         case t: ExecutionException if 
t.getCause.isInstanceOf[UnknownTopicOrPartitionException] =>
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala 
b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 8ba2e68..f7b1c87 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -28,7 +28,7 @@ import org.apache.kafka.clients.admin.CreatePartitionsOptions
 import org.apache.kafka.clients.admin.CreateTopicsOptions
 import org.apache.kafka.clients.admin.DeleteTopicsOptions
 import org.apache.kafka.clients.admin.{Admin, ListTopicsOptions, 
NewPartitions, NewTopic, PartitionReassignment, Config => JConfig}
-import org.apache.kafka.common.{TopicPartition, TopicPartitionInfo, Uuid}
+import org.apache.kafka.common.{TopicCollection, TopicPartition, 
TopicPartitionInfo, Uuid}
 import org.apache.kafka.common.config.ConfigResource.Type
 import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
 import org.apache.kafka.common.errors.{ClusterAuthorizationException, 
TopicExistsException, UnsupportedVersionException}
@@ -264,7 +264,7 @@ object TopicCommand extends Logging {
       ensureTopicExists(topics, opts.topic, !opts.ifExists)
 
       if (topics.nonEmpty) {
-        val topicsInfo = 
adminClient.describeTopics(topics.asJavaCollection).values()
+        val topicsInfo = 
adminClient.describeTopics(topics.asJavaCollection).topicNameValues()
         val newPartitions = topics.map { topicName =>
           if (topic.hasReplicaAssignment) {
             val startPartitionId = 
topicsInfo.get(topicName).get().partitions().size()
@@ -297,42 +297,60 @@ object TopicCommand extends Logging {
     }
 
     def describeTopic(opts: TopicCommandOptions): Unit = {
-      val topics = getTopics(opts.topic, opts.excludeInternalTopics)
-      ensureTopicExists(topics, opts.topic, !opts.ifExists)
+      // If topicId is provided and not zero, will use topicId regardless of 
topic name
+      val inputTopicId = opts.topicId.map(Uuid.fromString).filter(uuid => uuid 
!= Uuid.ZERO_UUID)
+      val useTopicId = inputTopicId.nonEmpty
 
-      if (topics.nonEmpty) {
-        val allConfigs = adminClient.describeConfigs(topics.map(new 
ConfigResource(Type.TOPIC, _)).asJavaCollection).values()
-        val liveBrokers = 
adminClient.describeCluster().nodes().get().asScala.map(_.id())
-        val topicDescriptions = 
adminClient.describeTopics(topics.asJavaCollection).all().get().values().asScala
-        val describeOptions = new DescribeOptions(opts, liveBrokers.toSet)
-        val topicPartitions = topicDescriptions
-          .flatMap(td => td.partitions.iterator().asScala.map(p => new 
TopicPartition(td.name(), p.partition())))
-          .toSet.asJava
-        val reassignments = listAllReassignments(topicPartitions)
-
-        for (td <- topicDescriptions) {
-          val topicName = td.name
-          val topicId = td.topicId()
-          val config = allConfigs.get(new ConfigResource(Type.TOPIC, 
topicName)).get()
-          val sortedPartitions = td.partitions.asScala.sortBy(_.partition)
-
-          if (describeOptions.describeConfigs) {
-            val hasNonDefault = config.entries().asScala.exists(!_.isDefault)
-            if (!opts.reportOverriddenConfigs || hasNonDefault) {
-              val numPartitions = td.partitions().size
-              val firstPartition = td.partitions.iterator.next()
-              val reassignment = reassignments.get(new TopicPartition(td.name, 
firstPartition.partition))
-              val topicDesc = TopicDescription(topicName, topicId, 
numPartitions, getReplicationFactor(firstPartition, reassignment), config, 
markedForDeletion = false)
-              topicDesc.printDescription()
-            }
+      val (topicIds, topics) = if (useTopicId)
+        (getTopicIds(inputTopicId, opts.excludeInternalTopics), Seq())
+      else
+        (Seq(), getTopics(opts.topic, opts.excludeInternalTopics))
+
+      // Only check topic name when topicId is not provided
+      if (useTopicId)
+        ensureTopicIdExists(topicIds, inputTopicId, !opts.ifExists)
+      else
+        ensureTopicExists(topics, opts.topic, !opts.ifExists)
+
+      val topicDescriptions = if (topicIds.nonEmpty) {
+        
adminClient.describeTopics(TopicCollection.ofTopicIds(topicIds.toSeq.asJavaCollection)).allTopicIds().get().values().asScala
+      } else if (topics.nonEmpty) {
+        
adminClient.describeTopics(TopicCollection.ofTopicNames(topics.asJavaCollection)).allTopicNames().get().values().asScala
+      } else {
+        Seq()
+      }
+
+      val topicNames = topicDescriptions.map(_.name())
+      val allConfigs = adminClient.describeConfigs(topicNames.map(new 
ConfigResource(Type.TOPIC, _)).asJavaCollection).values()
+      val liveBrokers = 
adminClient.describeCluster().nodes().get().asScala.map(_.id())
+      val describeOptions = new DescribeOptions(opts, liveBrokers.toSet)
+      val topicPartitions = topicDescriptions
+        .flatMap(td => td.partitions.iterator().asScala.map(p => new 
TopicPartition(td.name(), p.partition())))
+        .toSet.asJava
+      val reassignments = listAllReassignments(topicPartitions)
+
+      for (td <- topicDescriptions) {
+        val topicName = td.name
+        val topicId = td.topicId()
+        val config = allConfigs.get(new ConfigResource(Type.TOPIC, 
topicName)).get()
+        val sortedPartitions = td.partitions.asScala.sortBy(_.partition)
+
+        if (describeOptions.describeConfigs) {
+          val hasNonDefault = config.entries().asScala.exists(!_.isDefault)
+          if (!opts.reportOverriddenConfigs || hasNonDefault) {
+            val numPartitions = td.partitions().size
+            val firstPartition = td.partitions.iterator.next()
+            val reassignment = reassignments.get(new TopicPartition(td.name, 
firstPartition.partition))
+            val topicDesc = TopicDescription(topicName, topicId, 
numPartitions, getReplicationFactor(firstPartition, reassignment), config, 
markedForDeletion = false)
+            topicDesc.printDescription()
           }
+        }
 
-          if (describeOptions.describePartitions) {
-            for (partition <- sortedPartitions) {
-              val reassignment = reassignments.get(new TopicPartition(td.name, 
partition.partition))
-              val partitionDesc = PartitionDescription(topicName, partition, 
Some(config), markedForDeletion = false, reassignment)
-              describeOptions.maybePrintPartitionDescription(partitionDesc)
-            }
+        if (describeOptions.describePartitions) {
+          for (partition <- sortedPartitions) {
+            val reassignment = reassignments.get(new TopicPartition(td.name, 
partition.partition))
+            val partitionDesc = PartitionDescription(topicName, partition, 
Some(config), markedForDeletion = false, reassignment)
+            describeOptions.maybePrintPartitionDescription(partitionDesc)
           }
         }
       }
@@ -345,13 +363,23 @@ object TopicCommand extends Logging {
         .all().get()
     }
 
-    def getTopics(topicIncludelist: Option[String], excludeInternalTopics: 
Boolean = false): Seq[String] = {
+    def getTopics(topicIncludeList: Option[String], excludeInternalTopics: 
Boolean = false): Seq[String] = {
+      val allTopics = if (excludeInternalTopics) {
+        adminClient.listTopics()
+      } else {
+        adminClient.listTopics(new ListTopicsOptions().listInternal(true))
+      }
+      doGetTopics(allTopics.names().get().asScala.toSeq.sorted, 
topicIncludeList, excludeInternalTopics)
+    }
+
+    def getTopicIds(topicIdIncludeList: Option[Uuid], excludeInternalTopics: 
Boolean = false): Seq[Uuid] = {
       val allTopics = if (excludeInternalTopics) {
         adminClient.listTopics()
       } else {
         adminClient.listTopics(new ListTopicsOptions().listInternal(true))
       }
-      doGetTopics(allTopics.names().get().asScala.toSeq.sorted, 
topicIncludelist, excludeInternalTopics)
+      val allTopicIds = 
allTopics.listings().get().asScala.map(_.topicId()).toSeq.sorted
+      topicIdIncludeList.filter(allTopicIds.contains).toSeq
     }
 
     def close(): Unit = adminClient.close()
@@ -374,6 +402,23 @@ object TopicCommand extends Logging {
     }
   }
 
+  /**
+   * ensures topic existence and throws exception if topic doesn't exist
+   *
+   * @param foundTopicIds Topics that were found to match the requested topic 
id.
+   * @param requestedTopicId Id of the topic that was requested.
+   * @param requireTopicIdExists Indicates if the topic needs to exist for the 
operation to be successful.
+   *                             If set to true, the command will throw an 
exception if the topic with the
+   *                             requested id does not exist.
+   */
+  private def ensureTopicIdExists(foundTopicIds: Seq[Uuid], requestedTopicId: 
Option[Uuid], requireTopicIdExists: Boolean): Unit = {
+    // If no topic id was mentioned, do not need to throw exception.
+    if (requestedTopicId.isDefined && requireTopicIdExists && 
foundTopicIds.isEmpty) {
+      // If given topicId doesn't exist then throw exception
+      throw new IllegalArgumentException(s"TopicId '${requestedTopicId.get}' 
does not exist as expected")
+    }
+  }
+
   private def doGetTopics(allTopics: Seq[String], topicIncludeList: 
Option[String], excludeInternalTopics: Boolean): Seq[String] = {
     if (topicIncludeList.isDefined) {
       val topicsFilter = IncludeList(topicIncludeList.get)
@@ -464,6 +509,11 @@ object TopicCommand extends Logging {
                          .withRequiredArg
                          .describedAs("topic")
                          .ofType(classOf[String])
+    private val topicIdOpt = parser.accepts("topic-id", "The topic-id to 
describe." +
+      "This is used only with --bootstrap-server option for describing 
topics.")
+      .withRequiredArg
+      .describedAs("topic-id")
+      .ofType(classOf[String])
     private val nl = System.getProperty("line.separator")
     private val kafkaConfigsCanAlterTopicConfigsViaBootstrapServer =
       " (the kafka-configs CLI supports altering topic configs with a 
--bootstrap-server option)"
@@ -533,6 +583,7 @@ object TopicCommand extends Logging {
     def bootstrapServer: Option[String] = valueAsOption(bootstrapServerOpt)
     def commandConfig: Properties = if (has(commandConfigOpt)) 
Utils.loadProps(options.valueOf(commandConfigOpt)) else new Properties()
     def topic: Option[String] = valueAsOption(topicOpt)
+    def topicId: Option[String] = valueAsOption(topicIdOpt)
     def partitions: Option[Integer] = valueAsOption(partitionsOpt)
     def replicationFactor: Option[Integer] = 
valueAsOption(replicationFactorOpt)
     def replicaAssignment: Option[Map[Int, List[Int]]] =
@@ -566,8 +617,12 @@ object TopicCommand extends Logging {
       // check required args
       if (!has(bootstrapServerOpt))
         throw new IllegalArgumentException("--bootstrap-server must be 
specified")
-      if (has(describeOpt) && has(ifExistsOpt))
-        CommandLineUtils.checkRequiredArgs(parser, options, topicOpt)
+      if (has(describeOpt) && has(ifExistsOpt)) {
+        if (!has(topicOpt) && !has(topicIdOpt))
+          CommandLineUtils.printUsageAndDie(parser, "--topic or --topic-id is 
required to describe a topic")
+        if (has(topicOpt) && has(topicIdOpt))
+          println("Only topic id will be used when both --topic and --topic-id 
are specified and topicId is not Uuid.ZERO_UUID")
+      }
       if (!has(listOpt) && !has(describeOpt))
         CommandLineUtils.checkRequiredArgs(parser, options, topicOpt)
       if (has(createOpt) && !has(replicaAssignmentOpt))
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 144414d..c2e0c15 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1133,11 +1133,13 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   private def metadataResponseTopic(error: Errors,
                                     topic: String,
+                                    topicId: Uuid,
                                     isInternal: Boolean,
                                     partitionData: 
util.List[MetadataResponsePartition]): MetadataResponseTopic = {
     new MetadataResponseTopic()
       .setErrorCode(error.code)
       .setName(topic)
+      .setTopicId(topicId)
       .setIsInternal(isInternal)
       .setPartitions(partitionData)
   }
@@ -1174,6 +1176,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           metadataResponseTopic(
             error,
             topic,
+            metadataCache.getTopicId(topic),
             Topic.isInternal(topic),
             util.Collections.emptyList()
           )
@@ -1191,16 +1194,29 @@ class KafkaApis(val requestChannel: RequestChannel,
     // Topic IDs are not supported for versions 10 and 11. Topic names can not 
be null in these versions.
     if (!metadataRequest.isAllTopics) {
       metadataRequest.data.topics.forEach{ topic =>
-        if (topic.name == null) {
+        if (topic.name == null && metadataRequest.version < 12) {
           throw new InvalidRequestException(s"Topic name can not be null for 
version ${metadataRequest.version}")
-        } else if (topic.topicId != Uuid.ZERO_UUID) {
+        } else if (topic.topicId != Uuid.ZERO_UUID && metadataRequest.version 
< 12) {
           throw new InvalidRequestException(s"Topic IDs are not supported in 
requests for version ${metadataRequest.version}")
         }
       }
     }
 
+    // Check if topicId is presented firstly.
+    val topicIds = metadataRequest.topicIds.asScala.toSet.filterNot(_ == 
Uuid.ZERO_UUID)
+    val useTopicId = topicIds.nonEmpty
+
+    // Only get topicIds and topicNames when supporting topicId
+    val unknownTopicIds = 
topicIds.filter(metadataCache.getTopicName(_).isEmpty)
+    val knownTopicNames = topicIds.flatMap(metadataCache.getTopicName)
+
+    val unknownTopicIdsTopicMetadata = unknownTopicIds.map(topicId =>
+        metadataResponseTopic(Errors.UNKNOWN_TOPIC_ID, null, topicId, false, 
util.Collections.emptyList())).toSeq
+
     val topics = if (metadataRequest.isAllTopics)
       metadataCache.getAllTopics()
+    else if (useTopicId)
+      knownTopicNames
     else
       metadataRequest.topics.asScala.toSet
 
@@ -1222,16 +1238,23 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
 
     val unauthorizedForCreateTopicMetadata = 
unauthorizedForCreateTopics.map(topic =>
-      metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, topic, 
isInternal(topic), util.Collections.emptyList()))
+      // Set topicId to zero since we will never create topic which topicId
+      metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, topic, 
Uuid.ZERO_UUID, isInternal(topic), util.Collections.emptyList()))
 
     // do not disclose the existence of topics unauthorized for Describe, so 
we've not even checked if they exist or not
     val unauthorizedForDescribeTopicMetadata =
       // In case of all topics, don't include topics unauthorized for Describe
       if ((requestVersion == 0 && (metadataRequest.topics == null || 
metadataRequest.topics.isEmpty)) || metadataRequest.isAllTopics)
         Set.empty[MetadataResponseTopic]
-      else
+      else if (useTopicId) {
+        // Topic IDs are not considered sensitive information, so returning 
TOPIC_AUTHORIZATION_FAILED is OK
         unauthorizedForDescribeTopics.map(topic =>
-          metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, topic, 
false, util.Collections.emptyList()))
+          metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, null, 
metadataCache.getTopicId(topic), false, util.Collections.emptyList()))
+      } else {
+        // We should not return topicId when on unauthorized error, so we 
return zero uuid.
+        unauthorizedForDescribeTopics.map(topic =>
+          metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, topic, 
Uuid.ZERO_UUID, false, util.Collections.emptyList()))
+      }
 
     // In version 0, we returned an error when brokers with replicas were 
unavailable,
     // while in higher versions we simply don't include the broker in the 
returned broker list
@@ -1267,7 +1290,8 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     }
 
-    val completeTopicMetadata = topicMetadata ++ 
unauthorizedForCreateTopicMetadata ++ unauthorizedForDescribeTopicMetadata
+    val completeTopicMetadata =  unknownTopicIdsTopicMetadata ++
+      topicMetadata ++ unauthorizedForCreateTopicMetadata ++ 
unauthorizedForDescribeTopicMetadata
 
     val brokers = 
metadataCache.getAliveBrokerNodes(request.context.listenerName)
 
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala 
b/core/src/main/scala/kafka/server/MetadataCache.scala
index a6d5623..67285d7 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -54,6 +54,10 @@ trait MetadataCache {
 
   def getAliveBrokers(): Iterable[BrokerMetadata]
 
+  def getTopicId(topicName: String): Uuid
+
+  def getTopicName(topicId: Uuid): Option[String]
+
   def getAliveBrokerNode(brokerId: Int, listenerName: ListenerName): 
Option[Node]
 
   def getAliveBrokerNodes(listenerName: ListenerName): Iterable[Node]
diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala 
b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
index 6ae1922..b7fbd17 100644
--- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
@@ -193,6 +193,10 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
     }
   }
 
+  override def getTopicId(topicName: String): Uuid = 
_currentImage.topics().topicsByName().asScala.get(topicName).map(_.id()).getOrElse(Uuid.ZERO_UUID)
+
+  override def getTopicName(topicId: Uuid): Option[String] = 
_currentImage.topics().topicsById.asScala.get(topicId).map(_.name())
+
   override def hasAliveBroker(brokerId: Int): Boolean = {
     Option(_currentImage.cluster().broker(brokerId)).count(!_.fenced()) == 1
   }
diff --git a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala 
b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
index 71bfdf3..83b5eed 100755
--- a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
@@ -234,6 +234,14 @@ class ZkMetadataCache(brokerId: Int) extends MetadataCache 
with Logging {
     metadataSnapshot.aliveBrokers.values.flatMap(_.getNode(listenerName))
   }
 
+  def getTopicId(topicName: String): Uuid = {
+    metadataSnapshot.topicIds.getOrElse(topicName, Uuid.ZERO_UUID)
+  }
+
+  def getTopicName(topicId: Uuid): Option[String] = {
+    metadataSnapshot.topicNames.get(topicId)
+  }
+
   private def addOrUpdatePartitionInfo(partitionStates: 
mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]],
                                        topic: String,
                                        partitionId: Int,
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala 
b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index 82bc477..b2bece4 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -221,7 +221,7 @@ object ReplicaVerificationTool extends Logging {
 
   private def listTopicsMetadata(adminClient: Admin): Seq[TopicDescription] = {
     val topics = adminClient.listTopics(new 
ListTopicsOptions().listInternal(true)).names.get
-    adminClient.describeTopics(topics).all.get.values.asScala.toBuffer
+    
adminClient.describeTopics(topics).allTopicNames.get.values.asScala.toBuffer
   }
 
   private def brokerDetails(adminClient: Admin): Map[Int, Node] = {
diff --git 
a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
index b9aa208..7525cd2 100644
--- a/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala
@@ -110,7 +110,7 @@ abstract class BaseAdminIntegrationTest extends 
IntegrationTestHarness with Logg
     
assertFutureExceptionTypeEquals(failedCreateResult.replicationFactor("mytopic3"),
 classOf[TopicExistsException])
     assertFutureExceptionTypeEquals(failedCreateResult.config("mytopic3"), 
classOf[TopicExistsException])
 
-    val topicToDescription = client.describeTopics(topics.asJava).all.get()
+    val topicToDescription = 
client.describeTopics(topics.asJava).allTopicNames.get()
     assertEquals(topics.toSet, topicToDescription.keySet.asScala)
 
     val topic0 = topicToDescription.get("mytopic")
@@ -225,7 +225,7 @@ abstract class BaseAdminIntegrationTest extends 
IntegrationTestHarness with Logg
                        expectedNumPartitionsOpt: Option[Int] = None): 
TopicDescription = {
     var result: TopicDescription = null
     waitUntilTrue(() => {
-      val topicResult = client.describeTopics(Set(topic).asJava, 
describeOptions).values.get(topic)
+      val topicResult = client.describeTopics(Set(topic).asJava, 
describeOptions).topicNameValues().get(topic)
       try {
         result = topicResult.get
         expectedNumPartitionsOpt.map(_ == 
result.partitions.size).getOrElse(true)
diff --git 
a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
 
b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
index 6cc7c48..22fee47 100644
--- 
a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
@@ -183,13 +183,13 @@ class DescribeAuthorizedOperationsTest extends 
IntegrationTestHarness with SaslS
     createTopic(Topic2)
 
     // test without includeAuthorizedOperations flag
-    var describeTopicsResult = client.describeTopics(Set(Topic1, 
Topic2).asJava).all.get()
+    var describeTopicsResult = client.describeTopics(Set(Topic1, 
Topic2).asJava).allTopicNames.get()
     assertNull(describeTopicsResult.get(Topic1).authorizedOperations)
     assertNull(describeTopicsResult.get(Topic2).authorizedOperations)
 
     // test with includeAuthorizedOperations flag
     describeTopicsResult = client.describeTopics(Set(Topic1, Topic2).asJava,
-      new DescribeTopicsOptions().includeAuthorizedOperations(true)).all.get()
+      new 
DescribeTopicsOptions().includeAuthorizedOperations(true)).allTopicNames.get()
     assertEquals(Set(AclOperation.DESCRIBE), 
describeTopicsResult.get(Topic1).authorizedOperations().asScala.toSet)
     assertEquals(Set(AclOperation.DESCRIBE), 
describeTopicsResult.get(Topic2).authorizedOperations().asScala.toSet)
 
@@ -201,7 +201,7 @@ class DescribeAuthorizedOperationsTest extends 
IntegrationTestHarness with SaslS
     val expectedOperations = 
AclEntry.supportedOperations(ResourceType.TOPIC).asJava
 
     describeTopicsResult = client.describeTopics(Set(Topic1, Topic2).asJava,
-      new DescribeTopicsOptions().includeAuthorizedOperations(true)).all.get()
+      new 
DescribeTopicsOptions().includeAuthorizedOperations(true)).allTopicNames.get()
     assertEquals(expectedOperations, 
describeTopicsResult.get(Topic1).authorizedOperations())
     assertEquals(Set(AclOperation.DESCRIBE, AclOperation.DELETE),
       describeTopicsResult.get(Topic2).authorizedOperations().asScala.toSet)
diff --git 
a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala 
b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index f361641..df60010 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -346,7 +346,7 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
     consumer.assign(List(tp).asJava)
     assertThrows(classOf[TopicAuthorizationException], () => 
consumeRecords(consumer, numRecords, topic = tp.topic))
     val adminClient = createAdminClient()
-    val e1 = assertThrows(classOf[ExecutionException], () => 
adminClient.describeTopics(Set(topic).asJava).all().get())
+    val e1 = assertThrows(classOf[ExecutionException], () => 
adminClient.describeTopics(Set(topic).asJava).allTopicNames().get())
     assertTrue(e1.getCause.isInstanceOf[TopicAuthorizationException], 
"Unexpected exception " + e1.getCause)
 
     // Verify successful produce/consume/describe on another topic using the 
same producer, consumer and adminClient
@@ -356,9 +356,9 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
     sendRecords(producer, numRecords, tp2)
     consumer.assign(List(tp2).asJava)
     consumeRecords(consumer, numRecords, topic = topic2)
-    val describeResults = adminClient.describeTopics(Set(topic, 
topic2).asJava).values
+    val describeResults = adminClient.describeTopics(Set(topic, 
topic2).asJava).topicNameValues()
     assertEquals(1, describeResults.get(topic2).get().partitions().size())
-    val e2 = assertThrows(classOf[ExecutionException], () => 
adminClient.describeTopics(Set(topic).asJava).all().get())
+    val e2 = assertThrows(classOf[ExecutionException], () => 
adminClient.describeTopics(Set(topic).asJava).allTopicNames().get())
     assertTrue(e2.getCause.isInstanceOf[TopicAuthorizationException], 
"Unexpected exception " + e2.getCause)
 
     // Verify that consumer manually assigning both authorized and 
unauthorized topic doesn't consume
@@ -382,7 +382,7 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
     }
     sendRecords(producer, numRecords, tp)
     consumeRecordsIgnoreOneAuthorizationException(consumer, numRecords, 
startingOffset = 0, topic)
-    val describeResults2 = adminClient.describeTopics(Set(topic, 
topic2).asJava).values
+    val describeResults2 = adminClient.describeTopics(Set(topic, 
topic2).asJava).topicNameValues
     assertEquals(1, describeResults2.get(topic).get().partitions().size())
     assertEquals(1, describeResults2.get(topic2).get().partitions().size())
   }
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 6885c79..36586ed 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -40,17 +40,17 @@ import org.apache.kafka.common.errors._
 import org.apache.kafka.common.requests.{DeleteRecordsRequest, 
MetadataResponse}
 import org.apache.kafka.common.resource.{PatternType, ResourcePattern, 
ResourceType}
 import org.apache.kafka.common.utils.{Time, Utils}
-import org.apache.kafka.common.{ConsumerGroupState, ElectionType, 
TopicCollection, TopicPartition, TopicPartitionInfo, TopicPartitionReplica}
+import org.apache.kafka.common.{ConsumerGroupState, ElectionType, 
TopicCollection, TopicPartition, TopicPartitionInfo, TopicPartitionReplica, 
Uuid}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test}
 import org.slf4j.LoggerFactory
 
 import scala.annotation.nowarn
-import scala.jdk.CollectionConverters._
 import scala.collection.Seq
 import scala.compat.java8.OptionConverters._
 import scala.concurrent.duration.Duration
 import scala.concurrent.{Await, Future}
+import scala.jdk.CollectionConverters._
 import scala.util.Random
 
 /**
@@ -145,7 +145,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     val controller = servers.find(_.config.brokerId == 
TestUtils.waitUntilControllerElected(zkClient)).get
     controller.shutdown()
     controller.awaitShutdown()
-    val topicDesc = client.describeTopics(topics.asJava).all.get()
+    val topicDesc = client.describeTopics(topics.asJava).allTopicNames.get()
     assertEquals(topics.toSet, topicDesc.keySet.asScala)
   }
 
@@ -161,13 +161,29 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     waitForTopics(client, Seq(existingTopic), List())
 
     val nonExistingTopic = "non-existing"
-    val results = client.describeTopics(Seq(nonExistingTopic, 
existingTopic).asJava).values
+    val results = client.describeTopics(Seq(nonExistingTopic, 
existingTopic).asJava).topicNameValues()
     assertEquals(existingTopic, results.get(existingTopic).get.name)
     assertThrows(classOf[ExecutionException], () => 
results.get(nonExistingTopic).get).getCause.isInstanceOf[UnknownTopicOrPartitionException]
     assertEquals(None, zkClient.getTopicPartitionCount(nonExistingTopic))
   }
 
   @Test
+  def testDescribeTopicsWithIds(): Unit = {
+    client = Admin.create(createConfig)
+
+    val existingTopic = "existing-topic"
+    client.createTopics(Seq(existingTopic).map(new NewTopic(_, 1, 
1.toShort)).asJava).all.get()
+    waitForTopics(client, Seq(existingTopic), List())
+    val existingTopicId = 
zkClient.getTopicIdsForTopics(Set(existingTopic)).values.head
+
+    val nonExistingTopicId = Uuid.randomUuid()
+
+    val results = 
client.describeTopics(TopicCollection.ofTopicIds(Seq(existingTopicId, 
nonExistingTopicId).asJava)).topicIdValues()
+    assertEquals(existingTopicId, results.get(existingTopicId).get.topicId())
+    assertThrows(classOf[ExecutionException], () => 
results.get(nonExistingTopicId).get).getCause.isInstanceOf[UnknownTopicIdException]
+  }
+
+  @Test
   def testDescribeCluster(): Unit = {
     client = Admin.create(createConfig)
     val result = client.describeCluster
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
 
b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
index 2d4abf4..2a7d5c9 100644
--- 
a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
@@ -136,7 +136,7 @@ class SaslClientsWithInvalidCredentialsTest extends 
IntegrationTestHarness with
 
     def describeTopic(): Unit = {
       try {
-        val response = 
adminClient.describeTopics(Collections.singleton(topic)).all.get
+        val response = 
adminClient.describeTopics(Collections.singleton(topic)).allTopicNames.get
         assertEquals(1, response.size)
         response.forEach { (topic, description) =>
           assertEquals(numPartitions, description.partitions.size)
diff --git 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 08a55ae..84e8099 100644
--- 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -675,7 +675,7 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
     consumer.commitSync()
 
     def partitionInfo: TopicPartitionInfo =
-      
adminClients.head.describeTopics(Collections.singleton(topic)).values.get(topic).get().partitions().get(0)
+      
adminClients.head.describeTopics(Collections.singleton(topic)).topicNameValues().get(topic).get().partitions().get(0)
 
     val partitionInfo0 = partitionInfo
     assertEquals(partitionInfo0.replicas.get(0), partitionInfo0.leader)
diff --git 
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala 
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index b7ca0f7..25e79ad 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -393,7 +393,7 @@ class KRaftClusterTest {
         var currentMapping: Seq[Seq[Int]] = Seq()
         val expectedMapping = Seq(Seq(2, 1, 0), Seq(0, 1, 2), Seq(2, 3), 
Seq(3, 2, 0, 1))
         TestUtils.waitUntilTrue( () => {
-          val topicInfoMap = 
admin.describeTopics(Collections.singleton("foo")).all().get()
+          val topicInfoMap = 
admin.describeTopics(Collections.singleton("foo")).allTopicNames().get()
           if (topicInfoMap.containsKey("foo")) {
             currentMapping = 
translatePartitionInfoToSeq(topicInfoMap.get("foo").partitions())
             expectedMapping.equals(currentMapping)
diff --git 
a/core/src/test/scala/integration/kafka/server/MetadataRequestBetweenDifferentIbpTest.scala
 
b/core/src/test/scala/integration/kafka/server/MetadataRequestBetweenDifferentIbpTest.scala
new file mode 100644
index 0000000..387c244
--- /dev/null
+++ 
b/core/src/test/scala/integration/kafka/server/MetadataRequestBetweenDifferentIbpTest.scala
@@ -0,0 +1,96 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+
+import kafka.api.{ApiVersion, KAFKA_2_8_IV0}
+import kafka.network.SocketServer
+import kafka.utils.TestUtils
+import kafka.zk.ZkVersion
+import org.apache.kafka.common.Uuid
+import org.apache.kafka.common.message.MetadataRequestData
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
+
+import scala.collection.{Map, Seq}
+
+class MetadataRequestBetweenDifferentIbpTest extends BaseRequestTest {
+
+  override def brokerCount: Int = 3
+  override def generateConfigs: Seq[KafkaConfig] = {
+    Seq(
+      createConfig(0, KAFKA_2_8_IV0),
+      createConfig(1, ApiVersion.latestVersion),
+      createConfig(2, ApiVersion.latestVersion)
+    )
+  }
+
+  @Test
+  def testUnknownTopicId(): Unit = {
+    val topic = "topic"
+
+    // Kill controller and restart until broker with latest ibp become 
controller
+    ensureControllerIn(Seq(1, 2))
+    createTopic(topic, Map(0 -> Seq(1, 2, 0), 1 -> Seq(2, 0, 1)))
+
+    val resp1 = sendMetadataRequest(new MetadataRequest(requestData(topic, 
Uuid.ZERO_UUID), 12.toShort), controllerSocketServer)
+    val topicId = resp1.topicMetadata.iterator().next().topicId()
+
+    // We could still get topic metadata by topicId
+   val topicMetadata = sendMetadataRequest(new 
MetadataRequest(requestData(null, topicId), 12.toShort), controllerSocketServer)
+      .topicMetadata.iterator().next()
+    assertEquals(topicId, topicMetadata.topicId())
+    assertEquals(topic, topicMetadata.topic())
+
+    // Make the broker whose version=KAFKA_2_8_IV0 controller
+    ensureControllerIn(Seq(0))
+
+    // Restart the broker whose ibp is higher, and the controller will send 
metadata request to it
+    killBroker(1)
+    restartDeadBrokers()
+
+    // Send request to a broker whose ibp is higher and restarted just now
+    val resp2 = sendMetadataRequest(new MetadataRequest(requestData(topic, 
topicId), 12.toShort), brokerSocketServer(1))
+    assertEquals(Errors.UNKNOWN_TOPIC_ID, 
resp2.topicMetadata.iterator().next().error())
+  }
+
+  private def ensureControllerIn(brokerIds: Seq[Int]): Unit = {
+    while (!brokerIds.contains(controllerSocketServer.config.brokerId)) {
+      zkClient.deleteController(ZkVersion.MatchAnyVersion)
+      TestUtils.waitUntilControllerElected(zkClient)
+    }
+  }
+
+  private def createConfig(nodeId: Int,interBrokerVersion: ApiVersion): 
KafkaConfig = {
+    val props = TestUtils.createBrokerConfig(nodeId, zkConnect)
+    props.put(KafkaConfig.InterBrokerProtocolVersionProp, 
interBrokerVersion.version)
+    KafkaConfig.fromProps(props)
+  }
+
+  def requestData(topic: String, topicId: Uuid): MetadataRequestData = {
+    val data = new MetadataRequestData
+    data.topics.add(new 
MetadataRequestData.MetadataRequestTopic().setName(topic).setTopicId(topicId))
+    data
+  }
+
+  private def sendMetadataRequest(request: MetadataRequest, destination: 
SocketServer): MetadataResponse = {
+    connectAndReceive[MetadataResponse](request, destination)
+  }
+
+}
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala 
b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 4ce7658..4a53c67 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -147,7 +147,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
   private def waitUntilTopicGone(adminClient: Admin, topicName: String): Unit 
= {
     TestUtils.waitUntilTrue(() => {
       try {
-        
adminClient.describeTopics(util.Collections.singletonList(topicName)).all().get()
+        
adminClient.describeTopics(util.Collections.singletonList(topicName)).allTopicNames().get()
         false
       } catch {
         case e: ExecutionException =>
diff --git 
a/core/src/test/scala/unit/kafka/admin/TopicCommandIntegrationTest.scala 
b/core/src/test/scala/unit/kafka/admin/TopicCommandIntegrationTest.scala
index 9a9fca8..19dbd6a 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandIntegrationTest.scala
@@ -110,7 +110,7 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
 
     val partitions = adminClient
       .describeTopics(Collections.singletonList(testTopicName))
-      .all()
+      .allTopicNames()
       .get()
       .get(testTopicName)
       .partitions()
@@ -125,7 +125,7 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
 
     val partitions = adminClient
       .describeTopics(Collections.singletonList(testTopicName))
-      .all()
+      .allTopicNames()
       .get()
       .get(testTopicName)
       .partitions()
@@ -140,7 +140,7 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
 
     val partitions = adminClient
       .describeTopics(Collections.singletonList(testTopicName))
-      .all()
+      .allTopicNames()
       .get()
       .get(testTopicName)
       .partitions()
@@ -190,7 +190,7 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
 
     val partitions = adminClient
       .describeTopics(Collections.singletonList(testTopicName))
-      .all()
+      .allTopicNames()
       .get()
       .get(testTopicName)
       .partitions()
@@ -287,7 +287,7 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
     topicService.alterTopic(new TopicCommandOptions(
       Array("--topic", testTopicName, "--partitions", "3")))
 
-    val topicDescription = 
adminClient.describeTopics(Collections.singletonList(testTopicName)).values().get(testTopicName).get()
+    val topicDescription = 
adminClient.describeTopics(Collections.singletonList(testTopicName)).topicNameValues().get(testTopicName).get()
     assertTrue(topicDescription.partitions().size() == 3)
   }
 
@@ -300,7 +300,7 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
     topicService.alterTopic(new TopicCommandOptions(
       Array("--topic", testTopicName, "--replica-assignment", "5:3,3:1,4:2", 
"--partitions", "3")))
 
-    val topicDescription = 
adminClient.describeTopics(Collections.singletonList(testTopicName)).values().get(testTopicName).get()
+    val topicDescription = 
adminClient.describeTopics(Collections.singletonList(testTopicName)).topicNameValues().get(testTopicName).get()
     assertTrue(topicDescription.partitions().size() == 3)
     assertEquals(List(4,2), 
topicDescription.partitions().get(2).replicas().asScala.map(_.id()))
   }
@@ -491,7 +491,7 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
     try {
       // check which partition is on broker 0 which we'll kill
       val testTopicDescription = 
adminClient.describeTopics(Collections.singletonList(testTopicName))
-        .all().get().asScala(testTopicName)
+        .allTopicNames().get().asScala(testTopicName)
       val partitionOnBroker0 = 
testTopicDescription.partitions().asScala.find(_.leader().id() == 
0).get.partition()
 
       killBroker(0)
@@ -586,7 +586,7 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
     val brokerIds = servers.map(_.config.brokerId)
     TestUtils.setReplicationThrottleForPartitions(adminClient, brokerIds, 
Set(tp), throttleBytes = 1)
 
-    val testTopicDesc = 
adminClient.describeTopics(Collections.singleton(testTopicName)).all().get().get(testTopicName)
+    val testTopicDesc = 
adminClient.describeTopics(Collections.singleton(testTopicName)).allTopicNames().get().get(testTopicName)
     val firstPartition = testTopicDesc.partitions().asScala.head
 
     val replicasOfFirstPartition = 
firstPartition.replicas().asScala.map(_.id())
@@ -797,7 +797,7 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
       Collections.emptyList(), Collections.emptyList())
     val describeResult = 
AdminClientTestUtils.describeTopicsResult(testTopicName, new TopicDescription(
       testTopicName, false, Collections.singletonList(topicPartitionInfo)))
-    when(adminClient.describeTopics(any())).thenReturn(describeResult)
+    
when(adminClient.describeTopics(any(classOf[java.util.Collection[String]]))).thenReturn(describeResult)
 
     val result = AdminClientTestUtils.createPartitionsResult(testTopicName, 
Errors.THROTTLING_QUOTA_EXCEEDED.exception())
     when(adminClient.createPartitions(any(), any())).thenReturn(result)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 9739e92..1ddd63a 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -2232,6 +2232,117 @@ class KafkaApisTest {
     
assertTrue(response.topicsByError(Errors.UNKNOWN_TOPIC_OR_PARTITION).isEmpty)
   }
 
+  @Test
+  def testUnauthorizedTopicMetadataRequest(): Unit = {
+    // 1. Set up broker information
+    val plaintextListener = 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
+    val broker = new UpdateMetadataBroker()
+      .setId(0)
+      .setRack("rack")
+      .setEndpoints(Seq(
+        new UpdateMetadataEndpoint()
+          .setHost("broker0")
+          .setPort(9092)
+          .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
+          .setListener(plaintextListener.value)
+      ).asJava)
+
+    // 2. Set up authorizer
+    val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+    val unauthorizedTopic = "unauthorized-topic"
+    val authorizedTopic = "authorized-topic"
+
+    val expectedActions = Seq(
+      new Action(AclOperation.DESCRIBE, new 
ResourcePattern(ResourceType.TOPIC, unauthorizedTopic, PatternType.LITERAL), 1, 
true, true),
+      new Action(AclOperation.DESCRIBE, new 
ResourcePattern(ResourceType.TOPIC, authorizedTopic, PatternType.LITERAL), 1, 
true, true)
+    )
+
+    // Here we need to use AuthHelperTest.matchSameElements instead of 
EasyMock.eq since the order of the request is unknown
+    EasyMock.expect(authorizer.authorize(anyObject[RequestContext], 
AuthHelperTest.matchSameElements(expectedActions.asJava)))
+      .andAnswer { () =>
+      val actions = 
EasyMock.getCurrentArguments.apply(1).asInstanceOf[util.List[Action]].asScala
+      actions.map { action =>
+        if (action.resourcePattern().name().equals(authorizedTopic))
+          AuthorizationResult.ALLOWED
+        else
+          AuthorizationResult.DENIED
+      }.asJava
+    }.times(2)
+
+    // 3. Set up MetadataCache
+    val authorizedTopicId = Uuid.randomUuid()
+    val unauthorizedTopicId = Uuid.randomUuid();
+
+    val topicIds = new util.HashMap[String, Uuid]()
+    topicIds.put(authorizedTopic, authorizedTopicId)
+    topicIds.put(unauthorizedTopic, unauthorizedTopicId)
+
+    def createDummyPartitionStates(topic: String) = {
+      new UpdateMetadataPartitionState()
+        .setTopicName(topic)
+        .setPartitionIndex(0)
+        .setControllerEpoch(0)
+        .setLeader(0)
+        .setLeaderEpoch(0)
+        .setReplicas(Collections.singletonList(0))
+        .setZkVersion(0)
+        .setIsr(Collections.singletonList(0))
+    }
+
+    // Send UpdateMetadataReq to update MetadataCache
+    val partitionStates = Seq(unauthorizedTopic, 
authorizedTopic).map(createDummyPartitionStates)
+
+    val updateMetadataRequest = new 
UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0,
+      0, 0, partitionStates.asJava, Seq(broker).asJava, topicIds).build()
+    metadataCache.asInstanceOf[ZkMetadataCache].updateMetadata(correlationId = 
0, updateMetadataRequest)
+
+    // 4. Send TopicMetadataReq using topicId
+    val metadataReqByTopicId = new 
MetadataRequest.Builder(util.Arrays.asList(authorizedTopicId, 
unauthorizedTopicId)).build()
+    val repByTopicId = buildRequest(metadataReqByTopicId, plaintextListener)
+    val capturedMetadataByTopicIdResp = expectNoThrottling(repByTopicId)
+    EasyMock.replay(clientRequestQuotaManager, requestChannel, authorizer)
+
+    createKafkaApis(authorizer = 
Some(authorizer)).handleTopicMetadataRequest(repByTopicId)
+    val metadataByTopicIdResp = 
capturedMetadataByTopicIdResp.getValue.asInstanceOf[MetadataResponse]
+
+    val metadataByTopicId = 
metadataByTopicIdResp.data().topics().asScala.groupBy(_.topicId()).map(kv => 
(kv._1, kv._2.head))
+
+    metadataByTopicId.foreach{ case (topicId, metadataResponseTopic) =>
+      if (topicId == unauthorizedTopicId) {
+        // Return an TOPIC_AUTHORIZATION_FAILED on unauthorized error 
regardless of leaking the existence of topic id
+        assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), 
metadataResponseTopic.errorCode())
+        // Do not return topic information on unauthorized error
+        assertNull(metadataResponseTopic.name())
+      } else {
+        assertEquals(Errors.NONE.code(), metadataResponseTopic.errorCode())
+        assertEquals(authorizedTopic, metadataResponseTopic.name())
+      }
+    }
+
+    // 4. Send TopicMetadataReq using topic name
+    EasyMock.reset(clientRequestQuotaManager, requestChannel)
+    val metadataReqByTopicName = new 
MetadataRequest.Builder(util.Arrays.asList(authorizedTopic, unauthorizedTopic), 
false).build()
+    val repByTopicName = buildRequest(metadataReqByTopicName, 
plaintextListener)
+    val capturedMetadataByTopicNameResp = expectNoThrottling(repByTopicName)
+    EasyMock.replay(clientRequestQuotaManager, requestChannel)
+
+    createKafkaApis(authorizer = 
Some(authorizer)).handleTopicMetadataRequest(repByTopicName)
+    val metadataByTopicNameResp = 
capturedMetadataByTopicNameResp.getValue.asInstanceOf[MetadataResponse]
+
+    val metadataByTopicName = 
metadataByTopicNameResp.data().topics().asScala.groupBy(_.name()).map(kv => 
(kv._1, kv._2.head))
+
+    metadataByTopicName.foreach{ case (topicName, metadataResponseTopic) =>
+      if (topicName == unauthorizedTopic) {
+        assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), 
metadataResponseTopic.errorCode())
+        // Do not return topic Id on unauthorized error
+        assertEquals(Uuid.ZERO_UUID, metadataResponseTopic.topicId())
+      } else {
+        assertEquals(Errors.NONE.code(), metadataResponseTopic.errorCode())
+        assertEquals(authorizedTopicId, metadataResponseTopic.topicId())
+      }
+    }
+  }
+
   /**
    * Verifies that sending a fetch request with version 9 works correctly when
    * ReplicaManager.getLogConfig returns None.
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 5620bf7..64003b7 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1631,7 +1631,7 @@ object TestUtils extends Logging {
 
     waitUntilTrue(() => {
       try {
-        val topicResult = 
client.describeTopics(Arrays.asList(topic)).all.get.get(topic)
+        val topicResult = 
client.describeTopics(Arrays.asList(topic)).allTopicNames.get.get(topic)
         val partitionResult = topicResult.partitions.get(partition)
         Option(partitionResult.leader).map(_.id) == leader
       } catch {
@@ -1643,7 +1643,7 @@ object TestUtils extends Logging {
   def waitForBrokersOutOfIsr(client: Admin, partition: Set[TopicPartition], 
brokerIds: Set[Int]): Unit = {
     waitUntilTrue(
       () => {
-        val description = 
client.describeTopics(partition.map(_.topic).asJava).all.get.asScala
+        val description = 
client.describeTopics(partition.map(_.topic).asJava).allTopicNames.get.asScala
         val isr = description
           .values
           .flatMap(_.partitions.asScala.flatMap(_.isr.asScala))
@@ -1659,7 +1659,7 @@ object TestUtils extends Logging {
   def waitForBrokersInIsr(client: Admin, partition: TopicPartition, brokerIds: 
Set[Int]): Unit = {
     waitUntilTrue(
       () => {
-        val description = 
client.describeTopics(Set(partition.topic).asJava).all.get.asScala
+        val description = 
client.describeTopics(Set(partition.topic).asJava).allTopicNames.get.asScala
         val isr = description
           .values
           .flatMap(_.partitions.asScala.flatMap(_.isr.asScala))
@@ -1675,7 +1675,7 @@ object TestUtils extends Logging {
   def waitForReplicasAssigned(client: Admin, partition: TopicPartition, 
brokerIds: Seq[Int]): Unit = {
     waitUntilTrue(
       () => {
-        val description = 
client.describeTopics(Set(partition.topic).asJava).all.get.asScala
+        val description = 
client.describeTopics(Set(partition.topic).asJava).allTopicNames.get.asScala
         val replicas = description
           .values
           .flatMap(_.partitions.asScala.flatMap(_.replicas.asScala))
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
index 8d9e2da..6b47e26 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
@@ -399,7 +399,7 @@ public class TopicBasedRemoteLogMetadataManager implements 
RemoteLogMetadataMana
                                                       String topicName) throws 
InterruptedException, ExecutionException {
         log.debug("Getting topic details to check for partition count and 
replication factor.");
         TopicDescription topicDescription = 
adminClient.describeTopics(Collections.singleton(topicName))
-                                                       
.values().get(topicName).get();
+                                                       
.topicNameValues().get(topicName).get();
         int expectedPartitions = rlmmConfig.metadataTopicPartitionsCount();
         int topicPartitionsSize = topicDescription.partitions().size();
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index 12e6242..6954921 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -157,7 +157,7 @@ public class InternalTopicManager {
             Map<String, KafkaFuture<TopicDescription>> descriptionsForTopic = 
Collections.emptyMap();
             if (!topicDescriptionsStillToValidate.isEmpty()) {
                 final DescribeTopicsResult describeTopicsResult = 
adminClient.describeTopics(topicDescriptionsStillToValidate);
-                descriptionsForTopic = describeTopicsResult.values();
+                descriptionsForTopic = describeTopicsResult.topicNameValues();
             }
             Map<String, KafkaFuture<Config>> configsForTopic = 
Collections.emptyMap();
             if (!topicConfigsStillToValidate.isEmpty()) {
@@ -515,7 +515,7 @@ public class InternalTopicManager {
         log.debug("Trying to check if topics {} have been created with 
expected number of partitions.", topics);
 
         final DescribeTopicsResult describeTopicsResult = 
adminClient.describeTopics(topics);
-        final Map<String, KafkaFuture<TopicDescription>> futures = 
describeTopicsResult.values();
+        final Map<String, KafkaFuture<TopicDescription>> futures = 
describeTopicsResult.topicNameValues();
 
         final Map<String, Integer> existedTopicPartition = new HashMap<>();
         for (final Map.Entry<String, KafkaFuture<TopicDescription>> 
topicFuture : futures.entrySet()) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
index c2dee61..1e7f685 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
@@ -675,7 +675,7 @@ public class KStreamRepartitionIntegrationTest {
     private int getNumberOfPartitionsForTopic(final String topic) throws 
Exception {
         try (final AdminClient adminClient = createAdminClient()) {
             final TopicDescription topicDescription = 
adminClient.describeTopics(Collections.singleton(topic))
-                                                                 .values()
+                                                                 
.topicNameValues()
                                                                  .get(topic)
                                                                  .get();
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java
index c0fc796..512d1c1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java
@@ -205,7 +205,7 @@ public class 
StreamTableJoinTopologyOptimizationIntegrationTest {
     private int getNumberOfPartitionsForTopic(final String topic) throws 
Exception {
         try (final AdminClient adminClient = createAdminClient()) {
             final TopicDescription topicDescription = 
adminClient.describeTopics(Collections.singleton(topic))
-                                                                 .values()
+                                                                 
.topicNameValues()
                                                                  .get(topic)
                                                                  
.get(IntegrationTestUtils.DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
index abfa5c4..853d8d7 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
@@ -632,17 +632,17 @@ public class InternalTopicManagerTest {
             {
                 add(new TopicPartitionInfo(0, broker1, singleReplica, 
Collections.emptyList()));
             }
-        }), 
mockAdminClient.describeTopics(Collections.singleton(topic1)).values().get(topic1).get());
+        }), 
mockAdminClient.describeTopics(Collections.singleton(topic1)).topicNameValues().get(topic1).get());
         assertEquals(new TopicDescription(topic2, false, new 
ArrayList<TopicPartitionInfo>() {
             {
                 add(new TopicPartitionInfo(0, broker1, singleReplica, 
Collections.emptyList()));
             }
-        }), 
mockAdminClient.describeTopics(Collections.singleton(topic2)).values().get(topic2).get());
+        }), 
mockAdminClient.describeTopics(Collections.singleton(topic2)).topicNameValues().get(topic2).get());
         assertEquals(new TopicDescription(topic3, false, new 
ArrayList<TopicPartitionInfo>() {
             {
                 add(new TopicPartitionInfo(0, broker1, singleReplica, 
Collections.emptyList()));
             }
-        }), 
mockAdminClient.describeTopics(Collections.singleton(topic3)).values().get(topic3).get());
+        }), 
mockAdminClient.describeTopics(Collections.singleton(topic3)).topicNameValues().get(topic3).get());
 
         final ConfigResource resource = new 
ConfigResource(ConfigResource.Type.TOPIC, topic1);
         final ConfigResource resource2 = new 
ConfigResource(ConfigResource.Type.TOPIC, topic2);
@@ -1732,7 +1732,7 @@ public class InternalTopicManagerTest {
 
     private static class MockDescribeTopicsResult extends DescribeTopicsResult 
{
         MockDescribeTopicsResult(final Map<String, 
KafkaFuture<TopicDescription>> futures) {
-            super(futures);
+            super(null, futures);
         }
     }
 
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java 
b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
index a5d6c7a..e40ca7a 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
@@ -338,7 +338,7 @@ public class ClientCompatibilityTest {
             throws InterruptedException, ExecutionException {
         while (true) {
             try {
-                client.describeTopics(topics).all().get();
+                client.describeTopics(topics).allTopicNames().get();
                 break;
             } catch (ExecutionException e) {
                 if (e.getCause() instanceof UnknownTopicOrPartitionException)
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java 
b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
index b8f1c9d..10a368a 100644
--- a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
@@ -750,7 +750,7 @@ public abstract class TransactionsCommand {
             List<TopicPartition> topicPartitions
         ) throws Exception {
             try {
-                Map<String, TopicDescription> topicDescriptions = 
admin.describeTopics(topics).all().get();
+                Map<String, TopicDescription> topicDescriptions = 
admin.describeTopics(topics).allTopicNames().get();
                 topicDescriptions.forEach((topic, description) -> {
                     description.partitions().forEach(partitionInfo -> {
                         if (!brokerId.isPresent() || 
hasReplica(brokerId.get(), partitionInfo)) {
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java 
b/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
index 968c34a..98704e5 100644
--- a/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
@@ -540,7 +540,7 @@ public class TransactionsCommandTest {
         Map<String, TopicDescription> descriptions
     ) {
         DescribeTopicsResult result = Mockito.mock(DescribeTopicsResult.class);
-        Mockito.when(result.all()).thenReturn(completedFuture(descriptions));
+        
Mockito.when(result.allTopicNames()).thenReturn(completedFuture(descriptions));
         Mockito.when(admin.describeTopics(new 
ArrayList<>(descriptions.keySet()))).thenReturn(result);
     }
 
diff --git 
a/trogdor/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java 
b/trogdor/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
index 812129e..23c0ba4 100644
--- a/trogdor/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
+++ b/trogdor/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
@@ -279,7 +279,7 @@ public final class WorkerUtils {
             try {
                 DescribeTopicsResult topicsResult = adminClient.describeTopics(
                         topicsToVerify, new 
DescribeTopicsOptions().timeoutMs(ADMIN_REQUEST_TIMEOUT));
-                return topicsResult.all().get();
+                return topicsResult.allTopicNames().get();
             } catch (ExecutionException exception) {
                 if (exception.getCause() instanceof 
UnknownTopicOrPartitionException) {
                     lastException = (UnknownTopicOrPartitionException) 
exception.getCause();
@@ -321,7 +321,7 @@ public final class WorkerUtils {
         List<TopicPartition> out = new ArrayList<>();
         DescribeTopicsResult topicsResult = adminClient.describeTopics(
             matchedTopics, new 
DescribeTopicsOptions().timeoutMs(ADMIN_REQUEST_TIMEOUT));
-        Map<String, TopicDescription> topicDescriptionMap = 
topicsResult.all().get();
+        Map<String, TopicDescription> topicDescriptionMap = 
topicsResult.allTopicNames().get();
         for (TopicDescription desc: topicDescriptionMap.values()) {
             List<TopicPartitionInfo> partitions = desc.partitions();
             for (TopicPartitionInfo info: partitions) {
diff --git 
a/trogdor/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java 
b/trogdor/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java
index 738ec3c..a5fbc85 100644
--- a/trogdor/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java
+++ b/trogdor/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java
@@ -81,7 +81,7 @@ public class WorkerUtilsTest {
                 Collections.singletonList(
                     new TopicPartitionInfo(0, broker1, singleReplica, 
Collections.<Node>emptyList()))),
             adminClient.describeTopics(
-                
Collections.singleton(TEST_TOPIC)).values().get(TEST_TOPIC).get()
+                
Collections.singleton(TEST_TOPIC)).topicNameValues().get(TEST_TOPIC).get()
         );
     }
 
@@ -98,7 +98,7 @@ public class WorkerUtilsTest {
                 Collections.singletonList(
                     new TopicPartitionInfo(0, broker1, singleReplica, 
Collections.<Node>emptyList()))),
             adminClient.describeTopics(
-                
Collections.singleton(TEST_TOPIC)).values().get(TEST_TOPIC).get()
+                
Collections.singleton(TEST_TOPIC)).topicNameValues().get(TEST_TOPIC).get()
         );
     }
 
@@ -177,7 +177,7 @@ public class WorkerUtilsTest {
                 TEST_TOPIC, false,
                 Collections.singletonList(
                     new TopicPartitionInfo(0, broker1, singleReplica, 
Collections.<Node>emptyList()))),
-            
adminClient.describeTopics(Collections.singleton(TEST_TOPIC)).values().get(TEST_TOPIC).get()
+            
adminClient.describeTopics(Collections.singleton(TEST_TOPIC)).topicNameValues().get(TEST_TOPIC).get()
         );
     }
 

Reply via email to