chia7712 commented on code in PR #19884: URL: https://github.com/apache/kafka/pull/19884#discussion_r2160062957
########## .claude/settings.local.json: ########## @@ -0,0 +1,8 @@ +{ + "permissions": { Review Comment: What is this? ########## test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/RaftClusterInvocationContext.java: ########## @@ -71,6 +71,39 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte private final ClusterConfig clusterConfig; private final boolean isCombined; + // Copied from TestUtils (package-private) + private static final long DEFAULT_POLL_INTERVAL_MS = 100; Review Comment: those variables are unused. ########## test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java: ########## @@ -407,10 +411,57 @@ default int getLeaderBrokerId(TopicPartition topicPartition) throws ExecutionExc } } + /** + * Wait for a leader to be elected or changed using the provided admin client. + */ + default int waitUntilLeaderIsElectedOrChangedWithAdmin(Admin admin, + String topic, + int partitionNumber, + long timeoutMs) throws Exception { + long startTime = System.currentTimeMillis(); + TopicPartition topicPartition = new TopicPartition(topic, partitionNumber); + + while (System.currentTimeMillis() < startTime + timeoutMs) { + try { + TopicDescription topicDescription = admin.describeTopics(List.of(topic)) + .allTopicNames().get().get(topic); + + Optional<Integer> leader = topicDescription.partitions().stream() + .filter(partitionInfo -> partitionInfo.partition() == partitionNumber) + .findFirst() + .map(partitionInfo -> { + int leaderId = partitionInfo.leader().id(); + return leaderId == Node.noNode().id() ? null : leaderId; + }); + + if (leader.isPresent()) { + return leader.get(); + } + } catch (InterruptedException e) { Review Comment: Perhaps the `InterruptedException` could be thrown directly? ########## core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala: ########## @@ -592,8 +592,10 @@ class KafkaConfigTest { props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "plaintext://localhost:9091,SsL://localhost:9092") props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "PLAINTEXT:PLAINTEXT,SSL:SSL,CONTROLLER:PLAINTEXT") val config = KafkaConfig.fromProps(props) - assertEquals(Some("SSL://localhost:9092"), config.listeners.find(_.listener == "SSL").map(JTestUtils.endpointToString)) - assertEquals(Some("PLAINTEXT://localhost:9091"), config.listeners.find(_.listener == "PLAINTEXT").map(JTestUtils.endpointToString)) + JTestUtils.assertEndpointsEqual(new Endpoint("SSL", SecurityProtocol.SSL, "localhost", 9092), Review Comment: `assertEndpointsEqual` is used by this class only, so it could be moved to this class. Otherwise, we will have a heavy `TestUtils` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org