This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new c0add50a991 MINOR: Add interface for aliveBroker and isShutDwon for Brokers. (#16323) c0add50a991 is described below commit c0add50a991a1f9d57d9f00a8b205583a513ee92 Author: TaiJuWu <tjwu1...@gmail.com> AuthorDate: Wed Jun 19 22:32:08 2024 +0800 MINOR: Add interface for aliveBroker and isShutDwon for Brokers. (#16323) Reviewers: Chia-Ping Tsai <chia7...@gmail.com> --- core/src/main/scala/kafka/server/BrokerServer.scala | 4 ++++ core/src/main/scala/kafka/server/KafkaBroker.scala | 1 + core/src/main/scala/kafka/server/KafkaServer.scala | 7 +++++++ core/src/test/java/kafka/test/ClusterInstance.java | 5 +++++ .../test/java/kafka/test/ClusterTestExtensionsTest.java | 15 +++++++++++++++ 5 files changed, 32 insertions(+) diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index a03128ab9f5..723e6b01b61 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -733,6 +733,10 @@ class BrokerServer( } } + override def isShutdown(): Boolean = { + status == SHUTDOWN || status == SHUTTING_DOWN + } + override def awaitShutdown(): Unit = { lock.lock() try { diff --git a/core/src/main/scala/kafka/server/KafkaBroker.scala b/core/src/main/scala/kafka/server/KafkaBroker.scala index 9e1ee3d6941..59a7c9c7ba0 100644 --- a/core/src/main/scala/kafka/server/KafkaBroker.scala +++ b/core/src/main/scala/kafka/server/KafkaBroker.scala @@ -97,6 +97,7 @@ trait KafkaBroker extends Logging { def awaitShutdown(): Unit def shutdown(): Unit = shutdown(Duration.ofMinutes(5)) def shutdown(timeout: Duration): Unit + def isShutdown(): Boolean def brokerTopicStats: BrokerTopicStats def credentialProvider: CredentialProvider def clientToControllerChannelManager: NodeToControllerChannelManager diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 3154ab1c344..df9d631dca5 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -1085,6 +1085,13 @@ class KafkaServer( } } + override def isShutdown(): Boolean = { + BrokerState.fromValue(brokerState.value()) match { + case BrokerState.SHUTTING_DOWN | BrokerState.NOT_RUNNING => true + case _ => false + } + } + /** * After calling shutdown(), use this API to wait until the shutdown is complete */ diff --git a/core/src/test/java/kafka/test/ClusterInstance.java b/core/src/test/java/kafka/test/ClusterInstance.java index 6b85f12ba1b..d11d0764c8c 100644 --- a/core/src/test/java/kafka/test/ClusterInstance.java +++ b/core/src/test/java/kafka/test/ClusterInstance.java @@ -52,6 +52,11 @@ public interface ClusterInstance { Map<Integer, KafkaBroker> brokers(); + default Map<Integer, KafkaBroker> aliveBrokers() { + return brokers().entrySet().stream().filter(entry -> !entry.getValue().isShutdown()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + Map<Integer, ControllerServer> controllers(); /** diff --git a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java index 49add6ab353..c698632094e 100644 --- a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java +++ b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java @@ -234,4 +234,19 @@ public class ClusterTestExtensionsTest { Assertions.assertTrue(clusterInstance.supportedGroupProtocols().contains(CLASSIC)); Assertions.assertEquals(1, clusterInstance.supportedGroupProtocols().size()); } + + @ClusterTest(types = {Type.ZK, Type.CO_KRAFT, Type.KRAFT}, brokers = 4) + public void testClusterAliveBrokers(ClusterInstance clusterInstance) throws Exception { + clusterInstance.waitForReadyBrokers(); + + // Remove broker id 0 + clusterInstance.shutdownBroker(0); + Assertions.assertFalse(clusterInstance.aliveBrokers().containsKey(0)); + Assertions.assertTrue(clusterInstance.brokers().containsKey(0)); + + // add broker id 0 back + clusterInstance.startBroker(0); + Assertions.assertTrue(clusterInstance.aliveBrokers().containsKey(0)); + Assertions.assertTrue(clusterInstance.brokers().containsKey(0)); + } }