This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c0add50a991 MINOR: Add interface for aliveBroker and isShutDwon for 
Brokers. (#16323)
c0add50a991 is described below

commit c0add50a991a1f9d57d9f00a8b205583a513ee92
Author: TaiJuWu <tjwu1...@gmail.com>
AuthorDate: Wed Jun 19 22:32:08 2024 +0800

    MINOR: Add interface for aliveBroker and isShutDwon for Brokers. (#16323)
    
    Reviewers: Chia-Ping Tsai <chia7...@gmail.com>
---
 core/src/main/scala/kafka/server/BrokerServer.scala       |  4 ++++
 core/src/main/scala/kafka/server/KafkaBroker.scala        |  1 +
 core/src/main/scala/kafka/server/KafkaServer.scala        |  7 +++++++
 core/src/test/java/kafka/test/ClusterInstance.java        |  5 +++++
 .../test/java/kafka/test/ClusterTestExtensionsTest.java   | 15 +++++++++++++++
 5 files changed, 32 insertions(+)

diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index a03128ab9f5..723e6b01b61 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -733,6 +733,10 @@ class BrokerServer(
     }
   }
 
+  override def isShutdown(): Boolean = {
+    status == SHUTDOWN || status == SHUTTING_DOWN
+  }
+
   override def awaitShutdown(): Unit = {
     lock.lock()
     try {
diff --git a/core/src/main/scala/kafka/server/KafkaBroker.scala 
b/core/src/main/scala/kafka/server/KafkaBroker.scala
index 9e1ee3d6941..59a7c9c7ba0 100644
--- a/core/src/main/scala/kafka/server/KafkaBroker.scala
+++ b/core/src/main/scala/kafka/server/KafkaBroker.scala
@@ -97,6 +97,7 @@ trait KafkaBroker extends Logging {
   def awaitShutdown(): Unit
   def shutdown(): Unit = shutdown(Duration.ofMinutes(5))
   def shutdown(timeout: Duration): Unit
+  def isShutdown(): Boolean
   def brokerTopicStats: BrokerTopicStats
   def credentialProvider: CredentialProvider
   def clientToControllerChannelManager: NodeToControllerChannelManager
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 3154ab1c344..df9d631dca5 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -1085,6 +1085,13 @@ class KafkaServer(
     }
   }
 
+  override def isShutdown(): Boolean = {
+    BrokerState.fromValue(brokerState.value()) match {
+      case BrokerState.SHUTTING_DOWN | BrokerState.NOT_RUNNING => true
+      case _ => false
+    }
+  }
+
   /**
    * After calling shutdown(), use this API to wait until the shutdown is 
complete
    */
diff --git a/core/src/test/java/kafka/test/ClusterInstance.java 
b/core/src/test/java/kafka/test/ClusterInstance.java
index 6b85f12ba1b..d11d0764c8c 100644
--- a/core/src/test/java/kafka/test/ClusterInstance.java
+++ b/core/src/test/java/kafka/test/ClusterInstance.java
@@ -52,6 +52,11 @@ public interface ClusterInstance {
 
     Map<Integer, KafkaBroker> brokers();
 
+    default Map<Integer, KafkaBroker> aliveBrokers() {
+        return brokers().entrySet().stream().filter(entry -> 
!entry.getValue().isShutdown())
+                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+    }
+
     Map<Integer, ControllerServer> controllers();
 
     /**
diff --git a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java 
b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
index 49add6ab353..c698632094e 100644
--- a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
+++ b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
@@ -234,4 +234,19 @@ public class ClusterTestExtensionsTest {
         
Assertions.assertTrue(clusterInstance.supportedGroupProtocols().contains(CLASSIC));
         Assertions.assertEquals(1, 
clusterInstance.supportedGroupProtocols().size());
     }
+
+    @ClusterTest(types = {Type.ZK, Type.CO_KRAFT, Type.KRAFT}, brokers = 4)
+    public void testClusterAliveBrokers(ClusterInstance clusterInstance) 
throws Exception {
+        clusterInstance.waitForReadyBrokers();
+
+        // Remove broker id 0
+        clusterInstance.shutdownBroker(0);
+        Assertions.assertFalse(clusterInstance.aliveBrokers().containsKey(0));
+        Assertions.assertTrue(clusterInstance.brokers().containsKey(0));
+
+        // add broker id 0 back
+        clusterInstance.startBroker(0);
+        Assertions.assertTrue(clusterInstance.aliveBrokers().containsKey(0));
+        Assertions.assertTrue(clusterInstance.brokers().containsKey(0));
+    }
 }

Reply via email to