This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d497250c22e KAFKA-18999 Remove BrokerMetadata (#19227)
d497250c22e is described below
commit d497250c22e25cb980a6f20c0afc5c332366fa99
Author: PoAn Yang <[email protected]>
AuthorDate: Sat Mar 22 19:30:28 2025 +0800
KAFKA-18999 Remove BrokerMetadata (#19227)
* Replace `BrokerMetadata` with `UsableBroker` in KRaftMetadataCache and
ReassignPartitionsCommand.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kafka/server/metadata/KRaftMetadataCache.scala | 14 ++----
.../org/apache/kafka/admin/BrokerMetadata.java | 52 ----------------------
.../tools/reassign/ReassignPartitionsCommand.java | 25 +++++------
.../tools/reassign/ReassignPartitionsUnitTest.java | 14 +++---
4 files changed, 21 insertions(+), 84 deletions(-)
diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
index 63d6a27f0a0..fb18da6872a 100644
--- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
@@ -18,7 +18,6 @@
package kafka.server.metadata
import kafka.utils.Logging
-import org.apache.kafka.admin.BrokerMetadata
import org.apache.kafka.common._
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.InvalidTopicException
@@ -341,13 +340,6 @@ class KRaftMetadataCache(
Option(_currentImage.cluster.broker(brokerId)).count(_.inControlledShutdown) ==
1
}
- private def getAliveBrokers(image: MetadataImage): util.List[BrokerMetadata]
= {
- image.cluster().brokers().values().stream()
- .filter(Predicate.not(_.fenced))
- .map(broker => new BrokerMetadata(broker.id, broker.rack))
- .collect(Collectors.toList())
- }
-
override def getAliveBrokerNode(brokerId: Int, listenerName: ListenerName):
util.Optional[Node] = {
util.Optional.ofNullable(_currentImage.cluster().broker(brokerId))
.filter(Predicate.not(_.fenced))
@@ -422,11 +414,13 @@ class KRaftMetadataCache(
}
private def getRandomAliveBroker(image: MetadataImage):
util.Optional[Integer] = {
- val aliveBrokers = getAliveBrokers(image)
+ val aliveBrokers = image.cluster().brokers().values().stream()
+ .filter(Predicate.not(_.fenced))
+ .map(_.id()).toList
if (aliveBrokers.isEmpty) {
util.Optional.empty()
} else {
-
util.Optional.of(aliveBrokers.get(ThreadLocalRandom.current().nextInt(aliveBrokers.size)).id)
+
util.Optional.of(aliveBrokers.get(ThreadLocalRandom.current().nextInt(aliveBrokers.size)))
}
}
diff --git
a/server-common/src/main/java/org/apache/kafka/admin/BrokerMetadata.java
b/server-common/src/main/java/org/apache/kafka/admin/BrokerMetadata.java
deleted file mode 100644
index 98216fa0858..00000000000
--- a/server-common/src/main/java/org/apache/kafka/admin/BrokerMetadata.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.admin;
-
-import java.util.Objects;
-import java.util.Optional;
-
-/**
- * Broker metadata used by admin tools.
- */
-public class BrokerMetadata {
- public final int id;
-
- public final Optional<String> rack;
-
- /**
- * @param id an integer that uniquely identifies this broker
- * @param rack the rack of the broker, which is used to in rack aware
partition assignment for fault tolerance.
- * Examples: "RACK1", "us-east-1d"
- */
- public BrokerMetadata(int id, Optional<String> rack) {
- this.id = id;
- this.rack = rack;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- BrokerMetadata that = (BrokerMetadata) o;
- return id == that.id && Objects.equals(rack, that.rack);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(id, rack);
- }
-}
diff --git
a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
index 33bf23d13d1..6c215d5025a 100644
---
a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
+++
b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.tools.reassign;
-import org.apache.kafka.admin.BrokerMetadata;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.AlterConfigOp;
@@ -569,8 +568,8 @@ public class ReassignPartitionsCommand {
List<String> topicsToReassign = t0.getValue();
Map<TopicPartition, List<Integer>> currentAssignments =
getReplicaAssignmentForTopics(adminClient, topicsToReassign);
- List<BrokerMetadata> brokerMetadatas = getBrokerMetadata(adminClient,
brokersToReassign, enableRackAwareness);
- Map<TopicPartition, List<Integer>> proposedAssignments =
calculateAssignment(currentAssignments, brokerMetadatas);
+ List<UsableBroker> usableBrokers = getBrokerMetadata(adminClient,
brokersToReassign, enableRackAwareness);
+ Map<TopicPartition, List<Integer>> proposedAssignments =
calculateAssignment(currentAssignments, usableBrokers);
System.out.printf("Current partition replica assignment%n%s%n%n",
formatAsReassignmentJson(currentAssignments,
Collections.emptyMap()));
System.out.printf("Proposed partition reassignment
configuration%n%s%n",
@@ -582,12 +581,12 @@ public class ReassignPartitionsCommand {
* Calculate the new partition assignments to suggest in --generate.
*
* @param currentAssignment The current partition assignments.
- * @param brokerMetadatas The rack information for each broker.
+ * @param brokers The rack information for each broker.
*
* @return A map from partitions to the proposed
assignments for each.
*/
private static Map<TopicPartition, List<Integer>>
calculateAssignment(Map<TopicPartition, List<Integer>> currentAssignment,
-
List<BrokerMetadata> brokerMetadatas) {
+
List<UsableBroker> usableBrokers) {
Map<String, List<Entry<TopicPartition, List<Integer>>>> groupedByTopic
= new HashMap<>();
for (Entry<TopicPartition, List<Integer>> e :
currentAssignment.entrySet())
groupedByTopic.computeIfAbsent(e.getKey().topic(), k -> new
ArrayList<>()).add(e);
@@ -601,11 +600,7 @@ public class ReassignPartitionsCommand {
new ClusterDescriber() {
@Override
public Iterator<UsableBroker> usableBrokers() {
- return brokerMetadatas.stream().map(brokerMetadata
-> new UsableBroker(
- brokerMetadata.id,
- brokerMetadata.rack,
- false
- )).iterator();
+ return usableBrokers.iterator();
}
@Override
@@ -701,16 +696,16 @@ public class ReassignPartitionsCommand {
* @return The metadata for each broker that was found.
* Brokers that were not found will be omitted.
*/
- static List<BrokerMetadata> getBrokerMetadata(Admin adminClient,
List<Integer> brokers, boolean enableRackAwareness) throws ExecutionException,
InterruptedException {
+ static List<UsableBroker> getBrokerMetadata(Admin adminClient,
List<Integer> brokers, boolean enableRackAwareness) throws ExecutionException,
InterruptedException {
Set<Integer> brokerSet = new HashSet<>(brokers);
- List<BrokerMetadata> results =
adminClient.describeCluster().nodes().get().stream()
+ List<UsableBroker> results =
adminClient.describeCluster().nodes().get().stream()
.filter(node -> brokerSet.contains(node.id()))
.map(node -> (enableRackAwareness && node.rack() != null)
- ? new BrokerMetadata(node.id(), Optional.of(node.rack()))
- : new BrokerMetadata(node.id(), Optional.empty())
+ ? new UsableBroker(node.id(), Optional.of(node.rack()), false)
+ : new UsableBroker(node.id(), Optional.empty(), false)
).collect(Collectors.toList());
- long numRackless = results.stream().filter(m ->
m.rack.isEmpty()).count();
+ long numRackless = results.stream().filter(m ->
m.rack().isEmpty()).count();
if (enableRackAwareness && numRackless != 0 && numRackless !=
results.size()) {
throw new AdminOperationException("Not all brokers have rack
information. Add " +
"--disable-rack-aware in command line to make replica
assignment without rack " +
diff --git
a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java
index 793eba842d2..335e8fba693 100644
---
a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.tools.reassign;
-import org.apache.kafka.admin.BrokerMetadata;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.admin.PartitionReassignment;
@@ -29,6 +28,7 @@ import
org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.placement.UsableBroker;
import org.apache.kafka.server.common.AdminCommandFailedException;
import org.apache.kafka.server.common.AdminOperationException;
import org.apache.kafka.server.config.QuotaConfig;
@@ -317,19 +317,19 @@ public class ReassignPartitionsUnitTest {
build()) {
assertEquals(asList(
- new BrokerMetadata(0, Optional.of("rack0")),
- new BrokerMetadata(1, Optional.of("rack1"))
+ new UsableBroker(0, Optional.of("rack0"), false),
+ new UsableBroker(1, Optional.of("rack1"), false)
), getBrokerMetadata(adminClient, asList(0, 1), true));
assertEquals(asList(
- new BrokerMetadata(0, Optional.empty()),
- new BrokerMetadata(1, Optional.empty())
+ new UsableBroker(0, Optional.empty(), false),
+ new UsableBroker(1, Optional.empty(), false)
), getBrokerMetadata(adminClient, asList(0, 1), false));
assertStartsWith("Not all brokers have rack information",
assertThrows(AdminOperationException.class,
() -> getBrokerMetadata(adminClient, asList(1, 2),
true)).getMessage());
assertEquals(asList(
- new BrokerMetadata(1, Optional.empty()),
- new BrokerMetadata(2, Optional.empty())
+ new UsableBroker(1, Optional.empty(), false),
+ new UsableBroker(2, Optional.empty(), false)
), getBrokerMetadata(adminClient, asList(1, 2), false));
}
}