chia7712 commented on code in PR #18801:
URL: https://github.com/apache/kafka/pull/18801#discussion_r1973109296
##########
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##########
@@ -345,68 +352,75 @@ class KRaftMetadataCache(
Option(_currentImage.cluster.broker(brokerId)).count(_.inControlledShutdown) ==
1
}
- override def getAliveBrokers(): Iterable[BrokerMetadata] =
getAliveBrokers(_currentImage)
+ override def getAliveBrokers(): util.List[BrokerMetadata] =
getAliveBrokers(_currentImage)
- private def getAliveBrokers(image: MetadataImage): Iterable[BrokerMetadata]
= {
- image.cluster().brokers().values().asScala.filterNot(_.fenced()).
- map(b => new BrokerMetadata(b.id, b.rack))
+ private def getAliveBrokers(image: MetadataImage): util.List[BrokerMetadata]
= {
+ _currentImage.cluster().brokers().values().stream()
Review Comment:
`_currentImage` -> `image`
##########
metadata/src/main/java/org/apache/kafka/metadata/MetadataCache.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.admin.BrokerMetadata;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.DescribeClientQuotasRequestData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData;
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
+import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData;
+import
org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData;
+import org.apache.kafka.common.message.MetadataResponseData;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.common.FinalizedFeatures;
+import org.apache.kafka.server.common.MetadataVersion;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public interface MetadataCache extends ConfigRepository {
+
+ /**
+ * Return topic metadata for a given set of topics and listener. See
KafkaApis#handleTopicMetadataRequest for details
+ * on the use of the two boolean flags.
+ *
+ * @param topics The set of topics.
+ * @param listenerName The listener name.
+ * @param errorUnavailableEndpoints If true, we return an error on
unavailable brokers. This is used to support
+ * MetadataResponse version 0.
+ * @param errorUnavailableListeners If true, return LEADER_NOT_AVAILABLE
if the listener is not found on the leader.
+ * This is used for MetadataResponse
versions 0-5.
+ * @return A collection of topic metadata.
+ */
+ List<MetadataResponseData.MetadataResponseTopic> getTopicMetadata(
+ Set<String> topics,
+ ListenerName listenerName,
+ boolean errorUnavailableEndpoints,
+ boolean errorUnavailableListeners);
+
+ Set<String> getAllTopics();
+
+ Set<TopicPartition> getTopicPartitions(String topicName);
+
+ boolean hasAliveBroker(int brokerId);
+
+ List<BrokerMetadata> getAliveBrokers();
+
+ Optional<Long> getAliveBrokerEpoch(int brokerId);
+
+ boolean isBrokerFenced(int brokerId);
+
+ boolean isBrokerShuttingDown(int brokerId);
+
+ Uuid getTopicId(String topicName);
+
+ Optional<String> getTopicName(Uuid topicId);
+
+ Optional<Node> getAliveBrokerNode(int brokerId, ListenerName listenerName);
+
+ List<Node> getAliveBrokerNodes(ListenerName listenerName);
+
+ List<Node> getBrokerNodes(ListenerName listenerName);
+
+ Optional<LeaderAndIsr> getLeaderAndIsr(String topic, int partitionId);
+
+ /**
+ * Return the number of partitions in the given topic, or None if the
given topic does not exist.
+ */
+ Optional<Integer> numPartitions(String topic);
+
+ Map<String, Uuid> topicNamesToIds();
Review Comment:
it is useless - maybe we can remove it to simplify the interface.
##########
metadata/src/main/java/org/apache/kafka/metadata/MetadataCache.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.admin.BrokerMetadata;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.DescribeClientQuotasRequestData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData;
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
+import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData;
+import
org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData;
+import org.apache.kafka.common.message.MetadataResponseData;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.common.FinalizedFeatures;
+import org.apache.kafka.server.common.MetadataVersion;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public interface MetadataCache extends ConfigRepository {
+
+ /**
+ * Return topic metadata for a given set of topics and listener. See
KafkaApis#handleTopicMetadataRequest for details
+ * on the use of the two boolean flags.
+ *
+ * @param topics The set of topics.
+ * @param listenerName The listener name.
+ * @param errorUnavailableEndpoints If true, we return an error on
unavailable brokers. This is used to support
+ * MetadataResponse version 0.
+ * @param errorUnavailableListeners If true, return LEADER_NOT_AVAILABLE
if the listener is not found on the leader.
+ * This is used for MetadataResponse
versions 0-5.
+ * @return A collection of topic metadata.
+ */
+ List<MetadataResponseData.MetadataResponseTopic> getTopicMetadata(
+ Set<String> topics,
+ ListenerName listenerName,
+ boolean errorUnavailableEndpoints,
+ boolean errorUnavailableListeners);
+
+ Set<String> getAllTopics();
+
+ Set<TopicPartition> getTopicPartitions(String topicName);
+
+ boolean hasAliveBroker(int brokerId);
+
+ List<BrokerMetadata> getAliveBrokers();
+
+ Optional<Long> getAliveBrokerEpoch(int brokerId);
+
+ boolean isBrokerFenced(int brokerId);
+
+ boolean isBrokerShuttingDown(int brokerId);
+
+ Uuid getTopicId(String topicName);
+
+ Optional<String> getTopicName(Uuid topicId);
+
+ Optional<Node> getAliveBrokerNode(int brokerId, ListenerName listenerName);
+
+ List<Node> getAliveBrokerNodes(ListenerName listenerName);
+
+ List<Node> getBrokerNodes(ListenerName listenerName);
+
+ Optional<LeaderAndIsr> getLeaderAndIsr(String topic, int partitionId);
+
+ /**
+ * Return the number of partitions in the given topic, or None if the
given topic does not exist.
+ */
+ Optional<Integer> numPartitions(String topic);
+
+ Map<String, Uuid> topicNamesToIds();
+
+ Map<Uuid, String> topicIdsToNames();
+
+ Map.Entry<Map<String, Uuid>, Map<Uuid, String>> topicIdInfo();
Review Comment:
ditto
##########
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##########
@@ -239,35 +239,37 @@ class KRaftMetadataCache(
}
// errorUnavailableEndpoints exists to support v0 MetadataResponses
- override def getTopicMetadata(topics: Set[String],
+ override def getTopicMetadata(topics: util.Set[String],
listenerName: ListenerName,
errorUnavailableEndpoints: Boolean = false,
- errorUnavailableListeners: Boolean = false):
Seq[MetadataResponseTopic] = {
+ errorUnavailableListeners: Boolean = false):
util.List[MetadataResponseTopic] = {
val image = _currentImage
- topics.toSeq.flatMap { topic =>
- getPartitionMetadata(image, topic, listenerName,
errorUnavailableEndpoints, errorUnavailableListeners).map { partitionMetadata =>
- new MetadataResponseTopic()
- .setErrorCode(Errors.NONE.code)
- .setName(topic)
-
.setTopicId(Option(image.topics().getTopic(topic).id()).getOrElse(Uuid.ZERO_UUID))
- .setIsInternal(Topic.isInternal(topic))
- .setPartitions(partitionMetadata.toBuffer.asJava)
+ topics.stream().flatMap(topic =>
+ getPartitionMetadata(image, topic, listenerName,
errorUnavailableEndpoints, errorUnavailableListeners) match {
+ case Some(partitionMetadata) =>
+ util.stream.Stream.of(new MetadataResponseTopic()
+ .setErrorCode(Errors.NONE.code)
+ .setName(topic)
+
.setTopicId(Option(image.topics().getTopic(topic).id()).getOrElse(Uuid.ZERO_UUID))
+ .setIsInternal(Topic.isInternal(topic))
+ .setPartitions(partitionMetadata.toBuffer.asJava))
+ case None => util.stream.Stream.empty()
Review Comment:
please remove the redundant space
##########
test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java:
##########
@@ -290,7 +290,7 @@ default void waitForTopic(String topic, int partitions)
throws InterruptedExcept
TestUtils.waitForCondition(
() -> brokers.stream().allMatch(broker -> partitions == 0 ?
broker.metadataCache().numPartitions(topic).isEmpty() :
-
broker.metadataCache().numPartitions(topic).contains(partitions)
+ broker.metadataCache().numPartitions(topic).get() == partitions
Review Comment:
`broker.metadataCache().numPartitions(topic).filter(p -> p ==
partitions).isPresent()`
##########
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##########
@@ -239,35 +239,37 @@ class KRaftMetadataCache(
}
// errorUnavailableEndpoints exists to support v0 MetadataResponses
- override def getTopicMetadata(topics: Set[String],
+ override def getTopicMetadata(topics: util.Set[String],
listenerName: ListenerName,
errorUnavailableEndpoints: Boolean = false,
- errorUnavailableListeners: Boolean = false):
Seq[MetadataResponseTopic] = {
+ errorUnavailableListeners: Boolean = false):
util.List[MetadataResponseTopic] = {
val image = _currentImage
- topics.toSeq.flatMap { topic =>
- getPartitionMetadata(image, topic, listenerName,
errorUnavailableEndpoints, errorUnavailableListeners).map { partitionMetadata =>
- new MetadataResponseTopic()
- .setErrorCode(Errors.NONE.code)
- .setName(topic)
-
.setTopicId(Option(image.topics().getTopic(topic).id()).getOrElse(Uuid.ZERO_UUID))
- .setIsInternal(Topic.isInternal(topic))
- .setPartitions(partitionMetadata.toBuffer.asJava)
+ topics.stream().flatMap(topic =>
+ getPartitionMetadata(image, topic, listenerName,
errorUnavailableEndpoints, errorUnavailableListeners) match {
+ case Some(partitionMetadata) =>
+ util.stream.Stream.of(new MetadataResponseTopic()
+ .setErrorCode(Errors.NONE.code)
+ .setName(topic)
+
.setTopicId(Option(image.topics().getTopic(topic).id()).getOrElse(Uuid.ZERO_UUID))
+ .setIsInternal(Topic.isInternal(topic))
+ .setPartitions(partitionMetadata.toBuffer.asJava))
+ case None => util.stream.Stream.empty()
}
- }
+ ).collect(Collectors.toList())
}
override def describeTopicResponse(
- topics: Iterator[String],
+ topics: util.Iterator[String],
listenerName: ListenerName,
- topicPartitionStartIndex: String => Int,
+ topicPartitionStartIndex: util.function.Function[String, Integer],
maximumNumberOfPartitions: Int,
ignoreTopicsWithExceptions: Boolean
): DescribeTopicPartitionsResponseData = {
val image = _currentImage
var remaining = maximumNumberOfPartitions
val result = new DescribeTopicPartitionsResponseData()
breakable {
- topics.foreach { topicName =>
+ topics.asScala.foreach { topicName =>
Review Comment:
`forEachRemaining`
##########
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##########
@@ -345,68 +352,75 @@ class KRaftMetadataCache(
Option(_currentImage.cluster.broker(brokerId)).count(_.inControlledShutdown) ==
1
}
- override def getAliveBrokers(): Iterable[BrokerMetadata] =
getAliveBrokers(_currentImage)
+ override def getAliveBrokers(): util.List[BrokerMetadata] =
getAliveBrokers(_currentImage)
- private def getAliveBrokers(image: MetadataImage): Iterable[BrokerMetadata]
= {
- image.cluster().brokers().values().asScala.filterNot(_.fenced()).
- map(b => new BrokerMetadata(b.id, b.rack))
+ private def getAliveBrokers(image: MetadataImage): util.List[BrokerMetadata]
= {
+ _currentImage.cluster().brokers().values().stream()
+ .filter(Predicate.not(_.fenced))
+ .map(broker => new BrokerMetadata(broker.id, broker.rack))
+ .collect(Collectors.toList())
}
- override def getAliveBrokerNode(brokerId: Int, listenerName: ListenerName):
Option[Node] = {
- Option(_currentImage.cluster().broker(brokerId)).filterNot(_.fenced()).
- flatMap(_.node(listenerName.value()).toScala)
+ override def getAliveBrokerNode(brokerId: Int, listenerName: ListenerName):
util.Optional[Node] = {
+ util.Optional.ofNullable(_currentImage.cluster().broker(brokerId))
+ .filter(Predicate.not(_.fenced))
+ .flatMap(broker => broker.node(listenerName.value))
}
- override def getAliveBrokerNodes(listenerName: ListenerName): Seq[Node] = {
- _currentImage.cluster().brokers().values().asScala.filterNot(_.fenced()).
- flatMap(_.node(listenerName.value()).toScala).toSeq
+ override def getAliveBrokerNodes(listenerName: ListenerName):
util.List[Node] = {
+ _currentImage.cluster.brokers.values.stream
+ .filter(Predicate.not(_.fenced))
+ .flatMap(broker => broker.node(listenerName.value).stream)
+ .collect(Collectors.toList())
}
- override def getBrokerNodes(listenerName: ListenerName): Seq[Node] = {
-
_currentImage.cluster().brokers().values().asScala.flatMap(_.node(listenerName.value()).toScala).toSeq
+ override def getBrokerNodes(listenerName: ListenerName): util.List[Node] = {
+ _currentImage.cluster.brokers.values.stream
+ .flatMap(broker => broker.node(listenerName.value).stream)
+ .collect(Collectors.toList())
}
- override def getLeaderAndIsr(topicName: String, partitionId: Int):
Option[LeaderAndIsr] = {
- Option(_currentImage.topics().getTopic(topicName)).
- flatMap(topic => Option(topic.partitions().get(partitionId))).
- flatMap(partition => Some(new LeaderAndIsr(partition.leader,
partition.leaderEpoch,
+ override def getLeaderAndIsr(topicName: String, partitionId: Int):
util.Optional[LeaderAndIsr] = {
+ util.Optional.ofNullable(_currentImage.topics().getTopic(topicName)).
+ flatMap(topic =>
util.Optional.ofNullable(topic.partitions().get(partitionId))).
+ flatMap(partition => util.Optional.ofNullable(new
LeaderAndIsr(partition.leader, partition.leaderEpoch,
util.Arrays.asList(partition.isr.map(i => i: java.lang.Integer): _*),
partition.leaderRecoveryState, partition.partitionEpoch)))
}
- override def numPartitions(topicName: String): Option[Int] = {
- Option(_currentImage.topics().getTopic(topicName)).
+ override def numPartitions(topicName: String): util.Optional[Integer] = {
+ util.Optional.ofNullable(_currentImage.topics().getTopic(topicName)).
map(topic => topic.partitions().size())
}
override def topicNamesToIds(): util.Map[String, Uuid] =
_currentImage.topics.topicNameToIdView()
override def topicIdsToNames(): util.Map[Uuid, String] =
_currentImage.topics.topicIdToNameView()
- override def topicIdInfo(): (util.Map[String, Uuid], util.Map[Uuid, String])
= {
+ override def topicIdInfo(): util.Map.Entry[util.Map[String, Uuid],
util.Map[Uuid, String]] = {
val image = _currentImage
- (image.topics.topicNameToIdView(), image.topics.topicIdToNameView())
+ new util.AbstractMap.SimpleEntry(image.topics.topicNameToIdView(),
image.topics.topicIdToNameView())
}
// if the leader is not known, return None;
// if the leader is known and corresponding node is available, return
Some(node)
// if the leader is known but corresponding node with the listener name is
not available, return Some(NO_NODE)
- override def getPartitionLeaderEndpoint(topicName: String, partitionId: Int,
listenerName: ListenerName): Option[Node] = {
+ override def getPartitionLeaderEndpoint(topicName: String, partitionId: Int,
listenerName: ListenerName): util.Optional[Node] = {
val image = _currentImage
Option(image.topics().getTopic(topicName)) match {
- case None => None
+ case None => util.Optional.empty()
case Some(topic) => Option(topic.partitions().get(partitionId)) match {
- case None => None
+ case None => util.Optional.empty()
case Some(partition) =>
Option(image.cluster().broker(partition.leader)) match {
- case None => Some(Node.noNode)
- case Some(broker) =>
Some(broker.node(listenerName.value()).orElse(Node.noNode()))
+ case None => util.Optional.of(Node.noNode)
+ case Some(broker) =>
util.Optional.of(broker.node(listenerName.value()).orElse(Node.noNode()))
}
}
}
}
- override def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName:
ListenerName): Map[Int, Node] = {
+ override def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName:
ListenerName): util.Map[Integer, Node] = {
val image = _currentImage
- val result = new mutable.HashMap[Int, Node]()
+ val result = new mutable.HashMap[Integer, Node]()
Review Comment:
could you please use java Map instead?
##########
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##########
@@ -186,9 +188,9 @@ class AddPartitionsToTxnManager(
}
private def getTransactionCoordinator(partition: Int): Option[Node] = {
Review Comment:
we can use Java optional as it has only one usage
##########
metadata/src/main/java/org/apache/kafka/metadata/MetadataCache.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.admin.BrokerMetadata;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.DescribeClientQuotasRequestData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData;
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
+import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData;
+import
org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData;
+import org.apache.kafka.common.message.MetadataResponseData;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.common.FinalizedFeatures;
+import org.apache.kafka.server.common.MetadataVersion;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public interface MetadataCache extends ConfigRepository {
+
+ /**
+ * Return topic metadata for a given set of topics and listener. See
KafkaApis#handleTopicMetadataRequest for details
+ * on the use of the two boolean flags.
+ *
+ * @param topics The set of topics.
+ * @param listenerName The listener name.
+ * @param errorUnavailableEndpoints If true, we return an error on
unavailable brokers. This is used to support
+ * MetadataResponse version 0.
+ * @param errorUnavailableListeners If true, return LEADER_NOT_AVAILABLE
if the listener is not found on the leader.
+ * This is used for MetadataResponse
versions 0-5.
+ * @return A collection of topic metadata.
+ */
+ List<MetadataResponseData.MetadataResponseTopic> getTopicMetadata(
+ Set<String> topics,
+ ListenerName listenerName,
+ boolean errorUnavailableEndpoints,
+ boolean errorUnavailableListeners);
+
+ Set<String> getAllTopics();
+
+ Set<TopicPartition> getTopicPartitions(String topicName);
+
+ boolean hasAliveBroker(int brokerId);
+
+ List<BrokerMetadata> getAliveBrokers();
+
+ Optional<Long> getAliveBrokerEpoch(int brokerId);
+
+ boolean isBrokerFenced(int brokerId);
+
+ boolean isBrokerShuttingDown(int brokerId);
+
+ Uuid getTopicId(String topicName);
+
+ Optional<String> getTopicName(Uuid topicId);
+
+ Optional<Node> getAliveBrokerNode(int brokerId, ListenerName listenerName);
+
+ List<Node> getAliveBrokerNodes(ListenerName listenerName);
+
+ List<Node> getBrokerNodes(ListenerName listenerName);
+
+ Optional<LeaderAndIsr> getLeaderAndIsr(String topic, int partitionId);
+
+ /**
+ * Return the number of partitions in the given topic, or None if the
given topic does not exist.
+ */
+ Optional<Integer> numPartitions(String topic);
+
+ Map<String, Uuid> topicNamesToIds();
+
+ Map<Uuid, String> topicIdsToNames();
+
+ Map.Entry<Map<String, Uuid>, Map<Uuid, String>> topicIdInfo();
+
+ /**
+ * Get a partition leader's endpoint
+ *
+ * @return If the leader is known, and the listener name is available,
return Some(node). If the leader is known,
+ * but the listener is unavailable, return Some(Node.NO_NODE).
Otherwise, if the leader is not known,
+ * return None
+ */
+ Optional<Node> getPartitionLeaderEndpoint(String topic, int partitionId,
ListenerName listenerName);
+
+ Map<Integer, Node> getPartitionReplicaEndpoints(TopicPartition tp,
ListenerName listenerName);
+
+ Cluster getClusterMetadata(String clusterId, ListenerName listenerName);
Review Comment:
ditto
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]