chia7712 commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1580513112
########## core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala: ########## @@ -57,16 +57,18 @@ import org.slf4j.{Logger, LoggerFactory} import java.util import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit} -import java.util.{Collections, Optional, Properties, UUID} +import java.util.{Collections, Optional, UUID} import scala.collection.Seq import scala.jdk.CollectionConverters._ object ZkMigrationIntegrationTest { - def addZkBrokerProps(props: Properties): Unit = { - props.setProperty("inter.broker.listener.name", "EXTERNAL") - props.setProperty("listeners", "PLAINTEXT://localhost:0,EXTERNAL://localhost:0") - props.setProperty("advertised.listeners", "PLAINTEXT://localhost:0,EXTERNAL://localhost:0") - props.setProperty("listener.security.protocol.map", "EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT") + def addZkBrokerProps(builder: ClusterConfig.Builder): Unit = { Review Comment: It seems this method is used by `zkClustersForAllMigrationVersions` only. We can merge them into one method. For example: ```scala val serverProperties = new util.HashMap[String, String]() serverProperties.put("inter.broker.listener.name", "EXTERNAL") serverProperties.put("listeners", "PLAINTEXT://localhost:0,EXTERNAL://localhost:0") serverProperties.put("advertised.listeners", "PLAINTEXT://localhost:0,EXTERNAL://localhost:0") serverProperties.put("listener.security.protocol.map", "EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT") clusterGenerator.accept(ClusterConfig.defaultBuilder() .setMetadataVersion(mv) .setBrokers(3) .setType(Type.ZK) .setServerProperties(serverProperties).build()) ``` ########## core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala: ########## @@ -453,11 +464,16 @@ class ZkMigrationIntegrationTest { // Enable migration configs and restart brokers log.info("Restart brokers in migration mode") - zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true") - zkCluster.config().serverProperties().put(QuorumConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig()) - zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER") - zkCluster.config().serverProperties().put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT") - zkCluster.rollingBrokerRestart() + val serverProperties = new util.HashMap[String, String]() + serverProperties.putAll(zkCluster.config().serverProperties()) Review Comment: ditto ########## core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala: ########## @@ -517,11 +533,16 @@ class ZkMigrationIntegrationTest { // Enable migration configs and restart brokers log.info("Restart brokers in migration mode") - zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true") - zkCluster.config().serverProperties().put(QuorumConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig()) - zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER") - zkCluster.config().serverProperties().put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT") - zkCluster.rollingBrokerRestart() // This would throw if authorizers weren't allowed + val serverProperties = new util.HashMap[String, String]() + serverProperties.putAll(zkCluster.config().serverProperties()) Review Comment: ditto ########## core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala: ########## @@ -802,11 +838,16 @@ class ZkMigrationIntegrationTest { // Enable migration configs and restart brokers log.info("Restart brokers in migration mode") - zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true") - zkCluster.config().serverProperties().put(QuorumConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig()) - zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER") - zkCluster.config().serverProperties().put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT") - zkCluster.rollingBrokerRestart() + val serverProperties = new util.HashMap[String, String]() + serverProperties.putAll(zkCluster.config().serverProperties()) Review Comment: ditto ########## core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala: ########## @@ -667,11 +693,16 @@ class ZkMigrationIntegrationTest { // Enable migration configs and restart brokers log.info("Restart brokers in migration mode") - zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true") - zkCluster.config().serverProperties().put(QuorumConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig()) - zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER") - zkCluster.config().serverProperties().put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT") - zkCluster.rollingBrokerRestart() + val serverProperties = new util.HashMap[String, String]() + serverProperties.putAll(zkCluster.config().serverProperties()) Review Comment: ditto ########## core/src/test/java/kafka/test/ClusterConfig.java: ########## @@ -139,28 +151,38 @@ public Map<String, String> nameTags() { return tags; } - public ClusterConfig copyOf() { - ClusterConfig copy = new ClusterConfig(type, brokers, controllers, name, autoStart, securityProtocol, listenerName, trustStoreFile, metadataVersion); - copy.serverProperties.putAll(serverProperties); - copy.producerProperties.putAll(producerProperties); - copy.consumerProperties.putAll(consumerProperties); - copy.saslServerProperties.putAll(saslServerProperties); - copy.saslClientProperties.putAll(saslClientProperties); - perBrokerOverrideProperties.forEach((brokerId, props) -> { - Properties propsCopy = new Properties(); - propsCopy.putAll(props); - copy.perBrokerOverrideProperties.put(brokerId, propsCopy); - }); - return copy; + public static Builder defaultBuilder() { + return new Builder() + .setType(Type.ZK) + .setBrokers(1) + .setControllers(1) + .setAutoStart(true) + .setSecurityProtocol(SecurityProtocol.PLAINTEXT) + .setMetadataVersion(MetadataVersion.latestTesting()); } - public static Builder defaultClusterBuilder() { - return new Builder(Type.ZK, 1, 1, true, SecurityProtocol.PLAINTEXT, MetadataVersion.latestTesting()); + public static Builder builder() { + return new Builder(); } - public static Builder clusterBuilder(Type type, int brokers, int controllers, boolean autoStart, - SecurityProtocol securityProtocol, MetadataVersion metadataVersion) { - return new Builder(type, brokers, controllers, autoStart, securityProtocol, metadataVersion); + public static Builder builder(ClusterConfig clusterConfig) { Review Comment: Could you please add unit test for this method? That deep copy is error-prone. ########## core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala: ########## @@ -319,11 +325,16 @@ class ZkMigrationIntegrationTest { // Enable migration configs and restart brokers log.info("Restart brokers in migration mode") - zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true") - zkCluster.config().serverProperties().put(QuorumConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig()) - zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER") - zkCluster.config().serverProperties().put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT") - zkCluster.rollingBrokerRestart() + val serverProperties = new util.HashMap[String, String]() + serverProperties.putAll(zkCluster.config().serverProperties()) Review Comment: ditto ########## core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala: ########## @@ -602,11 +623,16 @@ class ZkMigrationIntegrationTest { // Enable migration configs and restart brokers log.info("Restart brokers in migration mode") - zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true") - zkCluster.config().serverProperties().put(QuorumConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig()) - zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER") - zkCluster.config().serverProperties().put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT") - zkCluster.rollingBrokerRestart() + val serverProperties = new util.HashMap[String, String]() + serverProperties.putAll(zkCluster.config().serverProperties()) Review Comment: ditto ########## core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala: ########## @@ -187,11 +188,16 @@ class ZkMigrationIntegrationTest { // Enable migration configs and restart brokers log.info("Restart brokers in migration mode") - zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true") - zkCluster.config().serverProperties().put(QuorumConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig()) - zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER") - zkCluster.config().serverProperties().put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT") - zkCluster.rollingBrokerRestart() // This would throw if authorizers weren't allowed + val serverProperties = new util.HashMap[String, String]() Review Comment: ```java val serverProperties = new util.HashMap[String, String](zkCluster.config().serverProperties()) ``` ########## core/src/test/java/kafka/test/ClusterTestExtensionsTest.java: ########## @@ -48,14 +51,18 @@ public class ClusterTestExtensionsTest { // Static methods can generate cluster configurations static void generate1(ClusterGenerator clusterGenerator) { - clusterGenerator.accept(ClusterConfig.defaultClusterBuilder().name("Generated Test").build()); + Map<String, String> serverProperties = new HashMap<>(); + serverProperties.put("foo", "bar"); + clusterGenerator.accept(ClusterConfig.defaultBuilder() + .setName("Generated Test") + .setServerProperties(serverProperties) + .build()); } // BeforeEach run after class construction, but before cluster initialization and test invocation @BeforeEach public void beforeEach(ClusterConfig config) { Review Comment: As `ClusterConfig` becomes immutable object, developers can modify the configs by injection of `ClusterConfig`. Maybe we should remove this usage from code base. WDYT? ########## core/src/test/java/kafka/test/ClusterTestExtensionsTest.java: ########## @@ -48,14 +51,18 @@ public class ClusterTestExtensionsTest { // Static methods can generate cluster configurations static void generate1(ClusterGenerator clusterGenerator) { - clusterGenerator.accept(ClusterConfig.defaultClusterBuilder().name("Generated Test").build()); + Map<String, String> serverProperties = new HashMap<>(); + serverProperties.put("foo", "bar"); + clusterGenerator.accept(ClusterConfig.defaultBuilder() + .setName("Generated Test") + .setServerProperties(serverProperties) Review Comment: We can use `Collections.singletonMap("foo", "bar")` ########## core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala: ########## @@ -727,11 +758,16 @@ class ZkMigrationIntegrationTest { // Enable migration configs and restart brokers log.info("Restart brokers in migration mode") - zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true") - zkCluster.config().serverProperties().put(QuorumConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig()) - zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER") - zkCluster.config().serverProperties().put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT") - zkCluster.rollingBrokerRestart() + val serverProperties = new util.HashMap[String, String]() + serverProperties.putAll(zkCluster.config().serverProperties()) Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org