chia7712 commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1580513112


##########
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##########
@@ -57,16 +57,18 @@ import org.slf4j.{Logger, LoggerFactory}
 
 import java.util
 import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit}
-import java.util.{Collections, Optional, Properties, UUID}
+import java.util.{Collections, Optional, UUID}
 import scala.collection.Seq
 import scala.jdk.CollectionConverters._
 
 object ZkMigrationIntegrationTest {
-  def addZkBrokerProps(props: Properties): Unit = {
-    props.setProperty("inter.broker.listener.name", "EXTERNAL")
-    props.setProperty("listeners", 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0")
-    props.setProperty("advertised.listeners", 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0")
-    props.setProperty("listener.security.protocol.map", 
"EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
+  def addZkBrokerProps(builder: ClusterConfig.Builder): Unit = {

Review Comment:
   It seems this method is used by `zkClustersForAllMigrationVersions` only. We 
can merge them into one method. For example:
   ```scala
         val serverProperties = new util.HashMap[String, String]()
         serverProperties.put("inter.broker.listener.name", "EXTERNAL")
         serverProperties.put("listeners", 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0")
         serverProperties.put("advertised.listeners", 
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0")
         serverProperties.put("listener.security.protocol.map", 
"EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
         clusterGenerator.accept(ClusterConfig.defaultBuilder()
           .setMetadataVersion(mv)
           .setBrokers(3)
           .setType(Type.ZK)
           .setServerProperties(serverProperties).build())
   ```



##########
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##########
@@ -453,11 +464,16 @@ class ZkMigrationIntegrationTest {
 
       // Enable migration configs and restart brokers
       log.info("Restart brokers in migration mode")
-      
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, 
"true")
-      
zkCluster.config().serverProperties().put(QuorumConfig.QUORUM_VOTERS_CONFIG, 
kraftCluster.quorumVotersConfig())
-      
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp,
 "CONTROLLER")
-      
zkCluster.config().serverProperties().put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
 "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
-      zkCluster.rollingBrokerRestart()
+      val serverProperties = new util.HashMap[String, String]()
+      serverProperties.putAll(zkCluster.config().serverProperties())

Review Comment:
   ditto



##########
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##########
@@ -517,11 +533,16 @@ class ZkMigrationIntegrationTest {
 
       // Enable migration configs and restart brokers
       log.info("Restart brokers in migration mode")
-      
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, 
"true")
-      
zkCluster.config().serverProperties().put(QuorumConfig.QUORUM_VOTERS_CONFIG, 
kraftCluster.quorumVotersConfig())
-      
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp,
 "CONTROLLER")
-      
zkCluster.config().serverProperties().put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
 "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
-      zkCluster.rollingBrokerRestart() // This would throw if authorizers 
weren't allowed
+      val serverProperties = new util.HashMap[String, String]()
+      serverProperties.putAll(zkCluster.config().serverProperties())

Review Comment:
   ditto



##########
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##########
@@ -802,11 +838,16 @@ class ZkMigrationIntegrationTest {
 
       // Enable migration configs and restart brokers
       log.info("Restart brokers in migration mode")
-      
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, 
"true")
-      
zkCluster.config().serverProperties().put(QuorumConfig.QUORUM_VOTERS_CONFIG, 
kraftCluster.quorumVotersConfig())
-      
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp,
 "CONTROLLER")
-      
zkCluster.config().serverProperties().put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
 "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
-      zkCluster.rollingBrokerRestart()
+      val serverProperties = new util.HashMap[String, String]()
+      serverProperties.putAll(zkCluster.config().serverProperties())

Review Comment:
   ditto



##########
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##########
@@ -667,11 +693,16 @@ class ZkMigrationIntegrationTest {
 
       // Enable migration configs and restart brokers
       log.info("Restart brokers in migration mode")
-      
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, 
"true")
-      
zkCluster.config().serverProperties().put(QuorumConfig.QUORUM_VOTERS_CONFIG, 
kraftCluster.quorumVotersConfig())
-      
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp,
 "CONTROLLER")
-      
zkCluster.config().serverProperties().put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
 "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
-      zkCluster.rollingBrokerRestart()
+      val serverProperties = new util.HashMap[String, String]()
+      serverProperties.putAll(zkCluster.config().serverProperties())

Review Comment:
   ditto



##########
core/src/test/java/kafka/test/ClusterConfig.java:
##########
@@ -139,28 +151,38 @@ public Map<String, String> nameTags() {
         return tags;
     }
 
-    public ClusterConfig copyOf() {
-        ClusterConfig copy = new ClusterConfig(type, brokers, controllers, 
name, autoStart, securityProtocol, listenerName, trustStoreFile, 
metadataVersion);
-        copy.serverProperties.putAll(serverProperties);
-        copy.producerProperties.putAll(producerProperties);
-        copy.consumerProperties.putAll(consumerProperties);
-        copy.saslServerProperties.putAll(saslServerProperties);
-        copy.saslClientProperties.putAll(saslClientProperties);
-        perBrokerOverrideProperties.forEach((brokerId, props) -> {
-            Properties propsCopy = new Properties();
-            propsCopy.putAll(props);
-            copy.perBrokerOverrideProperties.put(brokerId, propsCopy);
-        });
-        return copy;
+    public static Builder defaultBuilder() {
+        return new Builder()
+                .setType(Type.ZK)
+                .setBrokers(1)
+                .setControllers(1)
+                .setAutoStart(true)
+                .setSecurityProtocol(SecurityProtocol.PLAINTEXT)
+                .setMetadataVersion(MetadataVersion.latestTesting());
     }
 
-    public static Builder defaultClusterBuilder() {
-        return new Builder(Type.ZK, 1, 1, true, SecurityProtocol.PLAINTEXT, 
MetadataVersion.latestTesting());
+    public static Builder builder() {
+        return new Builder();
     }
 
-    public static Builder clusterBuilder(Type type, int brokers, int 
controllers, boolean autoStart,
-                                         SecurityProtocol securityProtocol, 
MetadataVersion metadataVersion) {
-        return new Builder(type, brokers, controllers, autoStart, 
securityProtocol, metadataVersion);
+    public static Builder builder(ClusterConfig clusterConfig) {

Review Comment:
   Could you please add unit test for this method? That deep copy is 
error-prone.



##########
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##########
@@ -319,11 +325,16 @@ class ZkMigrationIntegrationTest {
 
       // Enable migration configs and restart brokers
       log.info("Restart brokers in migration mode")
-      
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, 
"true")
-      
zkCluster.config().serverProperties().put(QuorumConfig.QUORUM_VOTERS_CONFIG, 
kraftCluster.quorumVotersConfig())
-      
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp,
 "CONTROLLER")
-      
zkCluster.config().serverProperties().put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
 "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
-      zkCluster.rollingBrokerRestart()
+      val serverProperties = new util.HashMap[String, String]()
+      serverProperties.putAll(zkCluster.config().serverProperties())

Review Comment:
   ditto



##########
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##########
@@ -602,11 +623,16 @@ class ZkMigrationIntegrationTest {
 
       // Enable migration configs and restart brokers
       log.info("Restart brokers in migration mode")
-      
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, 
"true")
-      
zkCluster.config().serverProperties().put(QuorumConfig.QUORUM_VOTERS_CONFIG, 
kraftCluster.quorumVotersConfig())
-      
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp,
 "CONTROLLER")
-      
zkCluster.config().serverProperties().put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
 "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
-      zkCluster.rollingBrokerRestart()
+      val serverProperties = new util.HashMap[String, String]()
+      serverProperties.putAll(zkCluster.config().serverProperties())

Review Comment:
   ditto



##########
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##########
@@ -187,11 +188,16 @@ class ZkMigrationIntegrationTest {
 
       // Enable migration configs and restart brokers
       log.info("Restart brokers in migration mode")
-      
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, 
"true")
-      
zkCluster.config().serverProperties().put(QuorumConfig.QUORUM_VOTERS_CONFIG, 
kraftCluster.quorumVotersConfig())
-      
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp,
 "CONTROLLER")
-      
zkCluster.config().serverProperties().put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
 "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
-      zkCluster.rollingBrokerRestart() // This would throw if authorizers 
weren't allowed
+      val serverProperties = new util.HashMap[String, String]()

Review Comment:
   ```java
         val serverProperties = new util.HashMap[String, 
String](zkCluster.config().serverProperties())
   ```



##########
core/src/test/java/kafka/test/ClusterTestExtensionsTest.java:
##########
@@ -48,14 +51,18 @@ public class ClusterTestExtensionsTest {
 
     // Static methods can generate cluster configurations
     static void generate1(ClusterGenerator clusterGenerator) {
-        
clusterGenerator.accept(ClusterConfig.defaultClusterBuilder().name("Generated 
Test").build());
+        Map<String, String> serverProperties = new HashMap<>();
+        serverProperties.put("foo", "bar");
+        clusterGenerator.accept(ClusterConfig.defaultBuilder()
+                .setName("Generated Test")
+                .setServerProperties(serverProperties)
+                .build());
     }
 
     // BeforeEach run after class construction, but before cluster 
initialization and test invocation
     @BeforeEach
     public void beforeEach(ClusterConfig config) {

Review Comment:
   As `ClusterConfig` becomes immutable object, developers can modify the 
configs by injection of `ClusterConfig`. Maybe we should remove this usage from 
code base.  WDYT?



##########
core/src/test/java/kafka/test/ClusterTestExtensionsTest.java:
##########
@@ -48,14 +51,18 @@ public class ClusterTestExtensionsTest {
 
     // Static methods can generate cluster configurations
     static void generate1(ClusterGenerator clusterGenerator) {
-        
clusterGenerator.accept(ClusterConfig.defaultClusterBuilder().name("Generated 
Test").build());
+        Map<String, String> serverProperties = new HashMap<>();
+        serverProperties.put("foo", "bar");
+        clusterGenerator.accept(ClusterConfig.defaultBuilder()
+                .setName("Generated Test")
+                .setServerProperties(serverProperties)

Review Comment:
   We can use `Collections.singletonMap("foo", "bar")`



##########
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##########
@@ -727,11 +758,16 @@ class ZkMigrationIntegrationTest {
 
       // Enable migration configs and restart brokers
       log.info("Restart brokers in migration mode")
-      
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, 
"true")
-      
zkCluster.config().serverProperties().put(QuorumConfig.QUORUM_VOTERS_CONFIG, 
kraftCluster.quorumVotersConfig())
-      
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp,
 "CONTROLLER")
-      
zkCluster.config().serverProperties().put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
 "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
-      zkCluster.rollingBrokerRestart()
+      val serverProperties = new util.HashMap[String, String]()
+      serverProperties.putAll(zkCluster.config().serverProperties())

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to