This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d497250c22e KAFKA-18999 Remove BrokerMetadata (#19227)
d497250c22e is described below

commit d497250c22e25cb980a6f20c0afc5c332366fa99
Author: PoAn Yang <[email protected]>
AuthorDate: Sat Mar 22 19:30:28 2025 +0800

    KAFKA-18999 Remove BrokerMetadata (#19227)
    
    * Replace `BrokerMetadata` with `UsableBroker` in KRaftMetadataCache and
    ReassignPartitionsCommand.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../kafka/server/metadata/KRaftMetadataCache.scala | 14 ++----
 .../org/apache/kafka/admin/BrokerMetadata.java     | 52 ----------------------
 .../tools/reassign/ReassignPartitionsCommand.java  | 25 +++++------
 .../tools/reassign/ReassignPartitionsUnitTest.java | 14 +++---
 4 files changed, 21 insertions(+), 84 deletions(-)

diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala 
b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
index 63d6a27f0a0..fb18da6872a 100644
--- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
@@ -18,7 +18,6 @@
 package kafka.server.metadata
 
 import kafka.utils.Logging
-import org.apache.kafka.admin.BrokerMetadata
 import org.apache.kafka.common._
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors.InvalidTopicException
@@ -341,13 +340,6 @@ class KRaftMetadataCache(
     
Option(_currentImage.cluster.broker(brokerId)).count(_.inControlledShutdown) == 
1
   }
 
-  private def getAliveBrokers(image: MetadataImage): util.List[BrokerMetadata] 
= {
-    image.cluster().brokers().values().stream()
-      .filter(Predicate.not(_.fenced))
-      .map(broker => new BrokerMetadata(broker.id, broker.rack))
-      .collect(Collectors.toList())
-  }
-
   override def getAliveBrokerNode(brokerId: Int, listenerName: ListenerName): 
util.Optional[Node] = {
     util.Optional.ofNullable(_currentImage.cluster().broker(brokerId))
       .filter(Predicate.not(_.fenced))
@@ -422,11 +414,13 @@ class KRaftMetadataCache(
   }
 
   private def getRandomAliveBroker(image: MetadataImage): 
util.Optional[Integer] = {
-    val aliveBrokers = getAliveBrokers(image)
+    val aliveBrokers = image.cluster().brokers().values().stream()
+      .filter(Predicate.not(_.fenced))
+      .map(_.id()).toList
     if (aliveBrokers.isEmpty) {
       util.Optional.empty()
     } else {
-      
util.Optional.of(aliveBrokers.get(ThreadLocalRandom.current().nextInt(aliveBrokers.size)).id)
+      
util.Optional.of(aliveBrokers.get(ThreadLocalRandom.current().nextInt(aliveBrokers.size)))
     }
   }
 
diff --git 
a/server-common/src/main/java/org/apache/kafka/admin/BrokerMetadata.java 
b/server-common/src/main/java/org/apache/kafka/admin/BrokerMetadata.java
deleted file mode 100644
index 98216fa0858..00000000000
--- a/server-common/src/main/java/org/apache/kafka/admin/BrokerMetadata.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.admin;
-
-import java.util.Objects;
-import java.util.Optional;
-
-/**
- * Broker metadata used by admin tools.
- */
-public class BrokerMetadata {
-    public final int id;
-
-    public final Optional<String> rack;
-
-    /**
-     * @param id an integer that uniquely identifies this broker
-     * @param rack the rack of the broker, which is used to in rack aware 
partition assignment for fault tolerance.
-     *             Examples: "RACK1", "us-east-1d"
-     */
-    public BrokerMetadata(int id, Optional<String> rack) {
-        this.id = id;
-        this.rack = rack;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        BrokerMetadata that = (BrokerMetadata) o;
-        return id == that.id && Objects.equals(rack, that.rack);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(id, rack);
-    }
-}
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
 
b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
index 33bf23d13d1..6c215d5025a 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.tools.reassign;
 
-import org.apache.kafka.admin.BrokerMetadata;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.AlterConfigOp;
@@ -569,8 +568,8 @@ public class ReassignPartitionsCommand {
         List<String> topicsToReassign = t0.getValue();
 
         Map<TopicPartition, List<Integer>> currentAssignments = 
getReplicaAssignmentForTopics(adminClient, topicsToReassign);
-        List<BrokerMetadata> brokerMetadatas = getBrokerMetadata(adminClient, 
brokersToReassign, enableRackAwareness);
-        Map<TopicPartition, List<Integer>> proposedAssignments = 
calculateAssignment(currentAssignments, brokerMetadatas);
+        List<UsableBroker> usableBrokers = getBrokerMetadata(adminClient, 
brokersToReassign, enableRackAwareness);
+        Map<TopicPartition, List<Integer>> proposedAssignments = 
calculateAssignment(currentAssignments, usableBrokers);
         System.out.printf("Current partition replica assignment%n%s%n%n",
             formatAsReassignmentJson(currentAssignments, 
Collections.emptyMap()));
         System.out.printf("Proposed partition reassignment 
configuration%n%s%n",
@@ -582,12 +581,12 @@ public class ReassignPartitionsCommand {
      * Calculate the new partition assignments to suggest in --generate.
      *
      * @param currentAssignment  The current partition assignments.
-     * @param brokerMetadatas    The rack information for each broker.
+     * @param brokers            The rack information for each broker.
      *
      * @return                   A map from partitions to the proposed 
assignments for each.
      */
     private static Map<TopicPartition, List<Integer>> 
calculateAssignment(Map<TopicPartition, List<Integer>> currentAssignment,
-                                                                          
List<BrokerMetadata> brokerMetadatas) {
+                                                                          
List<UsableBroker> usableBrokers) {
         Map<String, List<Entry<TopicPartition, List<Integer>>>> groupedByTopic 
= new HashMap<>();
         for (Entry<TopicPartition, List<Integer>> e : 
currentAssignment.entrySet())
             groupedByTopic.computeIfAbsent(e.getKey().topic(), k -> new 
ArrayList<>()).add(e);
@@ -601,11 +600,7 @@ public class ReassignPartitionsCommand {
                     new ClusterDescriber() {
                         @Override
                         public Iterator<UsableBroker> usableBrokers() {
-                            return brokerMetadatas.stream().map(brokerMetadata 
-> new UsableBroker(
-                                    brokerMetadata.id,
-                                    brokerMetadata.rack,
-                                    false
-                            )).iterator();
+                            return usableBrokers.iterator();
                         }
 
                         @Override
@@ -701,16 +696,16 @@ public class ReassignPartitionsCommand {
      * @return                    The metadata for each broker that was found.
      *                            Brokers that were not found will be omitted.
      */
-    static List<BrokerMetadata> getBrokerMetadata(Admin adminClient, 
List<Integer> brokers, boolean enableRackAwareness) throws ExecutionException, 
InterruptedException {
+    static List<UsableBroker> getBrokerMetadata(Admin adminClient, 
List<Integer> brokers, boolean enableRackAwareness) throws ExecutionException, 
InterruptedException {
         Set<Integer> brokerSet = new HashSet<>(brokers);
-        List<BrokerMetadata> results = 
adminClient.describeCluster().nodes().get().stream()
+        List<UsableBroker> results = 
adminClient.describeCluster().nodes().get().stream()
             .filter(node -> brokerSet.contains(node.id()))
             .map(node -> (enableRackAwareness && node.rack() != null)
-                ? new BrokerMetadata(node.id(), Optional.of(node.rack()))
-                : new BrokerMetadata(node.id(), Optional.empty())
+                ? new UsableBroker(node.id(), Optional.of(node.rack()), false)
+                : new UsableBroker(node.id(), Optional.empty(), false)
             ).collect(Collectors.toList());
 
-        long numRackless = results.stream().filter(m -> 
m.rack.isEmpty()).count();
+        long numRackless = results.stream().filter(m -> 
m.rack().isEmpty()).count();
         if (enableRackAwareness && numRackless != 0 && numRackless != 
results.size()) {
             throw new AdminOperationException("Not all brokers have rack 
information. Add " +
                 "--disable-rack-aware in command line to make replica 
assignment without rack " +
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java
index 793eba842d2..335e8fba693 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.tools.reassign;
 
-import org.apache.kafka.admin.BrokerMetadata;
 import org.apache.kafka.clients.admin.Config;
 import org.apache.kafka.clients.admin.MockAdminClient;
 import org.apache.kafka.clients.admin.PartitionReassignment;
@@ -29,6 +28,7 @@ import 
org.apache.kafka.common.errors.InvalidReplicationFactorException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.placement.UsableBroker;
 import org.apache.kafka.server.common.AdminCommandFailedException;
 import org.apache.kafka.server.common.AdminOperationException;
 import org.apache.kafka.server.config.QuotaConfig;
@@ -317,19 +317,19 @@ public class ReassignPartitionsUnitTest {
             build()) {
 
             assertEquals(asList(
-                new BrokerMetadata(0, Optional.of("rack0")),
-                new BrokerMetadata(1, Optional.of("rack1"))
+                new UsableBroker(0, Optional.of("rack0"), false),
+                new UsableBroker(1, Optional.of("rack1"), false)
             ), getBrokerMetadata(adminClient, asList(0, 1), true));
             assertEquals(asList(
-                new BrokerMetadata(0, Optional.empty()),
-                new BrokerMetadata(1, Optional.empty())
+                new UsableBroker(0, Optional.empty(), false),
+                new UsableBroker(1, Optional.empty(), false)
             ), getBrokerMetadata(adminClient, asList(0, 1), false));
             assertStartsWith("Not all brokers have rack information",
                 assertThrows(AdminOperationException.class,
                     () -> getBrokerMetadata(adminClient, asList(1, 2), 
true)).getMessage());
             assertEquals(asList(
-                new BrokerMetadata(1, Optional.empty()),
-                new BrokerMetadata(2, Optional.empty())
+                new UsableBroker(1, Optional.empty(), false),
+                new UsableBroker(2, Optional.empty(), false)
             ), getBrokerMetadata(adminClient, asList(1, 2), false));
         }
     }

Reply via email to