chia7712 commented on code in PR #19884:
URL: https://github.com/apache/kafka/pull/19884#discussion_r2160062957


##########
.claude/settings.local.json:
##########
@@ -0,0 +1,8 @@
+{
+  "permissions": {

Review Comment:
   What is this?



##########
test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/RaftClusterInvocationContext.java:
##########
@@ -71,6 +71,39 @@ public class RaftClusterInvocationContext implements 
TestTemplateInvocationConte
     private final ClusterConfig clusterConfig;
     private final boolean isCombined;
 
+    // Copied from TestUtils (package-private)
+    private static final long DEFAULT_POLL_INTERVAL_MS = 100;

Review Comment:
   those variables are unused.



##########
test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java:
##########
@@ -407,10 +411,57 @@ default int getLeaderBrokerId(TopicPartition 
topicPartition) throws ExecutionExc
         }
     }
 
+    /**
+     * Wait for a leader to be elected or changed using the provided admin 
client.
+     */
+    default int waitUntilLeaderIsElectedOrChangedWithAdmin(Admin admin,
+                                                           String topic,
+                                                           int partitionNumber,
+                                                           long timeoutMs) 
throws Exception {
+        long startTime = System.currentTimeMillis();
+        TopicPartition topicPartition = new TopicPartition(topic, 
partitionNumber);
+
+        while (System.currentTimeMillis() < startTime + timeoutMs) {
+            try {
+                TopicDescription topicDescription = 
admin.describeTopics(List.of(topic))
+                        .allTopicNames().get().get(topic);
+
+                Optional<Integer> leader = 
topicDescription.partitions().stream()
+                        .filter(partitionInfo -> partitionInfo.partition() == 
partitionNumber)
+                        .findFirst()
+                        .map(partitionInfo -> {
+                            int leaderId = partitionInfo.leader().id();
+                            return leaderId == Node.noNode().id() ? null : 
leaderId;
+                        });
+
+                if (leader.isPresent()) {
+                    return leader.get();
+                }
+            } catch (InterruptedException e) {

Review Comment:
   Perhaps the `InterruptedException` could be thrown directly? 



##########
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##########
@@ -592,8 +592,10 @@ class KafkaConfigTest {
     props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, 
"plaintext://localhost:9091,SsL://localhost:9092")
     
props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, 
"PLAINTEXT:PLAINTEXT,SSL:SSL,CONTROLLER:PLAINTEXT")
     val config = KafkaConfig.fromProps(props)
-    assertEquals(Some("SSL://localhost:9092"), 
config.listeners.find(_.listener == "SSL").map(JTestUtils.endpointToString))
-    assertEquals(Some("PLAINTEXT://localhost:9091"), 
config.listeners.find(_.listener == 
"PLAINTEXT").map(JTestUtils.endpointToString))
+    JTestUtils.assertEndpointsEqual(new Endpoint("SSL", SecurityProtocol.SSL, 
"localhost", 9092),

Review Comment:
   `assertEndpointsEqual` is used by this class only, so it could be moved to 
this class. Otherwise, we will have a heavy `TestUtils`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to