rajinisivaram commented on a change in pull request #9769: URL: https://github.com/apache/kafka/pull/9769#discussion_r562233235
########## File path: clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java ########## @@ -65,6 +65,19 @@ public TopicDescription(String name, boolean internal, List<TopicPartitionInfo> this(name, internal, partitions, Collections.emptySet()); } + /** + * Create an instance with the specified parameters. + * + * @param name The topic name + * @param internal Whether the topic is internal to Kafka + * @param partitions A list of partitions where the index represents the partition id and the element contains + * leadership and replica information for that partition. + * @param topicId the topic id + */ + public TopicDescription(String name, boolean internal, List<TopicPartitionInfo> partitions, Uuid topicId) { Review comment: Do we really need this constructor in the public class? We could just use the one below? ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ########## @@ -1822,6 +1829,88 @@ void handleFailure(Throwable throwable) { return new DescribeTopicsResult(new HashMap<>(topicFutures)); } + @Override + public DescribeTopicsResultWithIds describeTopicsWithIds(Collection<Uuid> topicIds, DescribeTopicsOptions options) { + + final Map<Uuid, KafkaFutureImpl<TopicDescription>> topicFutures = new HashMap<>(topicIds.size()); + final List<Uuid> topicIdsList = new ArrayList<>(); + for (Uuid topicId : topicIds) { + if (topicIdIsUnrepresentable(topicId)) { + KafkaFutureImpl<TopicDescription> future = new KafkaFutureImpl<>(); + future.completeExceptionally(new InvalidTopicException("The given topic id '" + + topicId + "' cannot be represented in a request.")); + topicFutures.put(topicId, future); + } else if (!topicFutures.containsKey(topicId)) { + topicFutures.put(topicId, new KafkaFutureImpl<>()); + topicIdsList.add(topicId); + } + } + final long now = time.milliseconds(); + Call call = new Call("describeTopicsWithIds", calcDeadlineMs(now, options.timeoutMs()), + new LeastLoadedNodeProvider()) { + + @Override + MetadataRequest.Builder createRequest(int timeoutMs) { + return new MetadataRequest.Builder(new MetadataRequestData() + .setTopics(convertTopicIdsToMetadataRequestTopic(topicIdsList)) + .setAllowAutoTopicCreation(false) + .setIncludeTopicAuthorizedOperations(options.includeAuthorizedOperations())); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + MetadataResponse response = (MetadataResponse) abstractResponse; + // Handle server responses for particular topics. + Cluster cluster = response.cluster(); + Map<Uuid, Errors> errors = response.errorsByTopicId(); + for (Map.Entry<Uuid, KafkaFutureImpl<TopicDescription>> entry : topicFutures.entrySet()) { + Uuid topicId = entry.getKey(); + KafkaFutureImpl<TopicDescription> future = entry.getValue(); + + String topicName = cluster.topicName(topicId); + if (topicName == null) { + future.completeExceptionally(new UnknownTopicIdException("TopicId " + topicId + " not found.")); + continue; + } + Errors topicError = errors.get(topicId); + if (topicError != null) { + future.completeExceptionally(topicError.exception()); + continue; + } + + boolean isInternal = cluster.internalTopics().contains(topicName); + List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topicName); + List<TopicPartitionInfo> partitions = new ArrayList<>(partitionInfos.size()); + for (PartitionInfo partitionInfo : partitionInfos) { + TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo( + partitionInfo.partition(), leader(partitionInfo), Arrays.asList(partitionInfo.replicas()), + Arrays.asList(partitionInfo.inSyncReplicas())); + partitions.add(topicPartitionInfo); + } + partitions.sort(Comparator.comparingInt(TopicPartitionInfo::partition)); + TopicDescription topicDescription = new TopicDescription(topicName, isInternal, partitions, + validAclOperations(response.topicAuthorizedOperations(topicName).get()), topicId); + future.complete(topicDescription); + } + } + + private Node leader(PartitionInfo partitionInfo) { Review comment: This could be shared with `describeTopics` as well? ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -1190,8 +1192,31 @@ class KafkaApis(val requestChannel: RequestChannel, val metadataRequest = request.body[MetadataRequest] val requestVersion = request.header.apiVersion + // Check if topicId is presented firstly. + val topicIds = metadataRequest.topicIds.asScala.toSet.filterNot(_ == Uuid.ZERO_UUID) + val supportedVersionTopicIds = if (config.interBrokerProtocolVersion >= KAFKA_2_8_IV1) topicIds else Set.empty[Uuid] Review comment: @jolshan Shouldn't we use `KAFKA_2_8_IV1` for `KafkaConfig.usesTopicId` and use that here? ########## File path: core/src/main/scala/kafka/admin/TopicCommand.scala ########## @@ -635,6 +650,11 @@ object TopicCommand extends Logging { .withRequiredArg .describedAs("topic") .ofType(classOf[String]) + private val topicIdOpt = parser.accepts("topic-id", "The topic-id to describe." + + "This is used only with --bootstrap-server option for manage topic.") Review comment: for manage topic => `for describing topics`? ########## File path: clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java ########## @@ -91,12 +91,31 @@ public int throttleTimeMs() { public Map<String, Errors> errors() { Map<String, Errors> errors = new HashMap<>(); for (MetadataResponseTopic metadata : data.topics()) { + if (metadata.name() == null) { + throw new NullPointerException("Use errorsByTopicId() when manage topic using topic id"); Review comment: We should never get here through the public APIs right? We could use IllegalStateException. ########## File path: clients/src/main/java/org/apache/kafka/common/protocol/Errors.java ########## @@ -350,7 +351,8 @@ POSITION_OUT_OF_RANGE( 99, "Requested position is not greater than or equal to zero, and less than the size of the snapshot.", - PositionOutOfRangeException::new); + PositionOutOfRangeException::new), + UNKNOWN_TOPIC_ID(100, "The topic ID dose not exist", UnknownTopicIdException::new); Review comment: typo: `does` ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java ########## @@ -79,6 +92,16 @@ public TopicDescription(String name, boolean internal, List<TopicPartitionInfo> this(name, internal, partitions, authorizedOperations, Uuid.ZERO_UUID); } + /** + * Create an instance with the specified parameters. + * + * @param name The topic name + * @param internal Whether the topic is internal to Kafka + * @param partitions A list of partitions where the index represents the partition id and the element contains + * leadership and replica information for that partition. + * @param authorizedOperations authorized operations for this topic, or null if this is not known. Review comment: Looks like default is empty set rather than null? ########## File path: clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java ########## @@ -91,12 +91,31 @@ public int throttleTimeMs() { public Map<String, Errors> errors() { Map<String, Errors> errors = new HashMap<>(); for (MetadataResponseTopic metadata : data.topics()) { + if (metadata.name() == null) { + throw new NullPointerException("Use errorsByTopicId() when manage topic using topic id"); + } if (metadata.errorCode() != Errors.NONE.code()) errors.put(metadata.name(), Errors.forCode(metadata.errorCode())); } return errors; } + /** + * Get a map of the topicIds which had metadata errors + * @return the map + */ + public Map<Uuid, Errors> errorsByTopicId() { + Map<Uuid, Errors> errors = new HashMap<>(); + for (MetadataResponseTopic metadata : data.topics()) { + if (metadata.topicId() == Uuid.ZERO_UUID) { + throw new NullPointerException("Use errors() when manage topic using topic name"); Review comment: As before, IllegalStateException? ########## File path: core/src/main/scala/kafka/admin/TopicCommand.scala ########## @@ -313,42 +313,57 @@ object TopicCommand extends Logging { } override def describeTopic(opts: TopicCommandOptions): Unit = { - val topics = getTopics(opts.topic, opts.excludeInternalTopics) - ensureTopicExists(topics, opts.topic, !opts.ifExists) + val topicId = opts.topicId.map(Uuid.fromString).filter(_ != Uuid.ZERO_UUID) + // if topicId is provided and not zero, will use topicId regardless of topic name Review comment: we would fail if both are provided? ########## File path: clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicIdException.java ########## @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +public class UnknownTopicIdException extends ApiException { + private static final long serialVersionUID = 1L; + public UnknownTopicIdException() { + super(); Review comment: `super()` is implied, so we can just leave the method empty? ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ########## @@ -1822,6 +1829,88 @@ void handleFailure(Throwable throwable) { return new DescribeTopicsResult(new HashMap<>(topicFutures)); } + @Override + public DescribeTopicsResultWithIds describeTopicsWithIds(Collection<Uuid> topicIds, DescribeTopicsOptions options) { + + final Map<Uuid, KafkaFutureImpl<TopicDescription>> topicFutures = new HashMap<>(topicIds.size()); + final List<Uuid> topicIdsList = new ArrayList<>(); + for (Uuid topicId : topicIds) { + if (topicIdIsUnrepresentable(topicId)) { + KafkaFutureImpl<TopicDescription> future = new KafkaFutureImpl<>(); + future.completeExceptionally(new InvalidTopicException("The given topic id '" + + topicId + "' cannot be represented in a request.")); + topicFutures.put(topicId, future); + } else if (!topicFutures.containsKey(topicId)) { + topicFutures.put(topicId, new KafkaFutureImpl<>()); + topicIdsList.add(topicId); + } + } + final long now = time.milliseconds(); + Call call = new Call("describeTopicsWithIds", calcDeadlineMs(now, options.timeoutMs()), + new LeastLoadedNodeProvider()) { + + @Override + MetadataRequest.Builder createRequest(int timeoutMs) { + return new MetadataRequest.Builder(new MetadataRequestData() + .setTopics(convertTopicIdsToMetadataRequestTopic(topicIdsList)) + .setAllowAutoTopicCreation(false) + .setIncludeTopicAuthorizedOperations(options.includeAuthorizedOperations())); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + MetadataResponse response = (MetadataResponse) abstractResponse; + // Handle server responses for particular topics. + Cluster cluster = response.cluster(); + Map<Uuid, Errors> errors = response.errorsByTopicId(); + for (Map.Entry<Uuid, KafkaFutureImpl<TopicDescription>> entry : topicFutures.entrySet()) { + Uuid topicId = entry.getKey(); + KafkaFutureImpl<TopicDescription> future = entry.getValue(); + + String topicName = cluster.topicName(topicId); + if (topicName == null) { + future.completeExceptionally(new UnknownTopicIdException("TopicId " + topicId + " not found.")); + continue; + } + Errors topicError = errors.get(topicId); + if (topicError != null) { + future.completeExceptionally(topicError.exception()); + continue; + } + + boolean isInternal = cluster.internalTopics().contains(topicName); + List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topicName); + List<TopicPartitionInfo> partitions = new ArrayList<>(partitionInfos.size()); + for (PartitionInfo partitionInfo : partitionInfos) { + TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo( + partitionInfo.partition(), leader(partitionInfo), Arrays.asList(partitionInfo.replicas()), + Arrays.asList(partitionInfo.inSyncReplicas())); + partitions.add(topicPartitionInfo); + } + partitions.sort(Comparator.comparingInt(TopicPartitionInfo::partition)); + TopicDescription topicDescription = new TopicDescription(topicName, isInternal, partitions, + validAclOperations(response.topicAuthorizedOperations(topicName).get()), topicId); + future.complete(topicDescription); Review comment: Could we move the code shared between `describeTopics` and `describeTopicsWithIds` to a common private method? ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -1190,8 +1192,31 @@ class KafkaApis(val requestChannel: RequestChannel, val metadataRequest = request.body[MetadataRequest] val requestVersion = request.header.apiVersion + // Check if topicId is presented firstly. + val topicIds = metadataRequest.topicIds.asScala.toSet.filterNot(_ == Uuid.ZERO_UUID) + val supportedVersionTopicIds = if (config.interBrokerProtocolVersion >= KAFKA_2_8_IV1) topicIds else Set.empty[Uuid] + + val unSupportedVersionTopicIds = topicIds.diff(supportedVersionTopicIds) Review comment: `unSupported` => `unsupported` since it is one word (below as well) ########## File path: core/src/main/scala/kafka/admin/TopicCommand.scala ########## @@ -313,42 +313,54 @@ object TopicCommand extends Logging { } override def describeTopic(opts: TopicCommandOptions): Unit = { - val topics = getTopics(opts.topic, opts.excludeInternalTopics) - ensureTopicExists(topics, opts.topic, !opts.ifExists) + val topicId = opts.topicId.map(Uuid.fromString).filter(_ != Uuid.ZERO_UUID) + val topics = if (topicId.isEmpty) + getTopics(opts.topic, opts.excludeInternalTopics) + else + Seq() - if (topics.nonEmpty) { - val allConfigs = adminClient.describeConfigs(topics.map(new ConfigResource(Type.TOPIC, _)).asJavaCollection).values() - val liveBrokers = adminClient.describeCluster().nodes().get().asScala.map(_.id()) - val topicDescriptions = adminClient.describeTopics(topics.asJavaCollection).all().get().values().asScala - val describeOptions = new DescribeOptions(opts, liveBrokers.toSet) - val topicPartitions = topicDescriptions - .flatMap(td => td.partitions.iterator().asScala.map(p => new TopicPartition(td.name(), p.partition()))) - .toSet.asJava - val reassignments = listAllReassignments(topicPartitions) - - for (td <- topicDescriptions) { - val topicName = td.name - val topicId = td.topicId() - val config = allConfigs.get(new ConfigResource(Type.TOPIC, topicName)).get() - val sortedPartitions = td.partitions.asScala.sortBy(_.partition) - - if (describeOptions.describeConfigs) { - val hasNonDefault = config.entries().asScala.exists(!_.isDefault) - if (!opts.reportOverriddenConfigs || hasNonDefault) { - val numPartitions = td.partitions().size - val firstPartition = td.partitions.iterator.next() - val reassignment = reassignments.get(new TopicPartition(td.name, firstPartition.partition)) - val topicDesc = TopicDescription(topicName, topicId, numPartitions, getReplicationFactor(firstPartition, reassignment), config, markedForDeletion = false) - topicDesc.printDescription() - } + if (topicId.isEmpty) + ensureTopicExists(topics, opts.topic, !opts.ifExists) Review comment: Since listTopics uses MetadataRequest, we just need to add topic ids from the MetadataResponse to TopicListing? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org