chia7712 commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1573839545


##########
core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java:
##########
@@ -290,7 +287,7 @@ public void waitForReadyBrokers() throws 
InterruptedException {
         }
 
         @Override
-        public void rollingBrokerRestart() {
+        public void rollingBrokerRestart(Optional<ClusterConfig> 
clusterConfig) {

Review Comment:
   As not all implementation support this method, we should remove it from 
interface. The callers can use `getUnderlying` to get zk instance and call that 
method



##########
core/src/test/java/kafka/test/ClusterConfig.java:
##########
@@ -211,13 +186,36 @@ public static class Builder {
         private String listenerName;
         private File trustStoreFile;
         private MetadataVersion metadataVersion;
-        private Properties serverProperties = new Properties();
-        private Properties producerProperties = new Properties();
-        private Properties consumerProperties = new Properties();
-        private Properties adminClientProperties = new Properties();
-        private Properties saslServerProperties = new Properties();
-        private Properties saslClientProperties = new Properties();
-        private final Map<Integer, Properties> perBrokerOverrideProperties = 
new HashMap<>();
+        private Map<String, String> serverProperties = new HashMap<>();
+        private Map<String, String> producerProperties = new HashMap<>();
+        private Map<String, String> consumerProperties = new HashMap<>();
+        private Map<String, String> adminClientProperties = new HashMap<>();
+        private Map<String, String> saslServerProperties = new HashMap<>();
+        private Map<String, String> saslClientProperties = new HashMap<>();
+        private Map<Integer, Map<String, String>> perBrokerOverrideProperties 
= new HashMap<>();
+
+        Builder() {}
+
+        Builder(ClusterConfig clusterConfig) {
+            this.type = clusterConfig.type;
+            this.brokers = clusterConfig.brokers;
+            this.controllers = clusterConfig.controllers;
+            this.name = clusterConfig.name;
+            this.autoStart = clusterConfig.autoStart;
+            this.securityProtocol = clusterConfig.securityProtocol;
+            this.listenerName = clusterConfig.listenerName;
+            this.trustStoreFile = clusterConfig.trustStoreFile;
+            this.metadataVersion = clusterConfig.metadataVersion;
+            this.serverProperties = new 
HashMap<>(clusterConfig.serverProperties);
+            this.producerProperties = new 
HashMap<>(clusterConfig.producerProperties);
+            this.consumerProperties = new 
HashMap<>(clusterConfig.consumerProperties);
+            this.adminClientProperties = new 
HashMap<>(clusterConfig.adminClientProperties);
+            this.saslServerProperties = new 
HashMap<>(clusterConfig.saslServerProperties);
+            this.saslClientProperties = new 
HashMap<>(clusterConfig.saslClientProperties);
+            Map<Integer, Map<String, String>> perBrokerOverrideProps = new 
HashMap<>();
+            clusterConfig.perBrokerOverrideProperties.forEach((k, v) -> 
perBrokerOverrideProps.put(k, new HashMap<>(v)));
+            this.perBrokerOverrideProperties = perBrokerOverrideProps;

Review Comment:
   ```java
               this.perBrokerOverrideProperties = 
clusterConfig.perBrokerOverrideProperties.entrySet().stream()
                       .collect(Collectors.toMap(Map.Entry::getKey, e -> new 
HashMap<>(e.getValue())));
   ```



##########
core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala:
##########
@@ -18,41 +18,58 @@ package kafka.server
 
 import java.net.Socket
 import java.util.Collections
-
 import kafka.api.{KafkaSasl, SaslSetup}
-import kafka.test.annotation.{ClusterTest, Type}
+import kafka.server.SaslApiVersionsRequestTest.{kafkaClientSaslMechanism, 
kafkaServerSaslMechanisms}
+import kafka.test.annotation.{ClusterTemplate, Type}
 import kafka.test.junit.ClusterTestExtensions
-import kafka.test.{ClusterConfig, ClusterInstance}
+import kafka.test.{ClusterConfig, ClusterGenerator, ClusterInstance}
 import kafka.utils.JaasTestUtils
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
 import org.apache.kafka.common.message.SaslHandshakeRequestData
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{ApiVersionsRequest, 
ApiVersionsResponse, SaslHandshakeRequest, SaslHandshakeResponse}
 import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.server.config.KafkaSecurityConfigs
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.extension.ExtendWith
 import org.junit.jupiter.api.{AfterEach, BeforeEach}
 
 import scala.jdk.CollectionConverters._
 
+object SaslApiVersionsRequestTest {
+  val kafkaClientSaslMechanism = "PLAIN"
+  val kafkaServerSaslMechanisms: Seq[String] = List("PLAIN")
+  val controlPlaneListenerName = "CONTROL_PLANE"
+  val securityProtocol = SecurityProtocol.SASL_PLAINTEXT
+
+  def saslApiVersionsRequestClusterConfig(clusterGenerator: ClusterGenerator): 
Unit = {
+    clusterGenerator.accept(ClusterConfig.defaultBuilder
+      .securityProtocol(securityProtocol)
+      .`type`(Type.ZK)
+      
.putSaslServerProperty(KafkaSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG,
 kafkaClientSaslMechanism)
+      
.putSaslServerProperty(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, 
kafkaServerSaslMechanisms.mkString(","))
+      .putSaslClientProperty(SaslConfigs.SASL_MECHANISM, 
kafkaClientSaslMechanism)
+      // Configure control plane listener to make sure we have separate 
listeners for testing.
+      .putServerProperty(KafkaConfig.ControlPlaneListenerNameProp, 
controlPlaneListenerName)
+      .putServerProperty(KafkaConfig.ListenerSecurityProtocolMapProp, 
s"$controlPlaneListenerName:$securityProtocol,$securityProtocol:$securityProtocol")
+      .putServerProperty("listeners", 
s"$securityProtocol://localhost:0,$controlPlaneListenerName://localhost:0")
+      .putServerProperty(KafkaConfig.AdvertisedListenersProp, 
s"$securityProtocol://localhost:0,$controlPlaneListenerName://localhost:0")
+      .build())
+  }
+}
 
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends 
AbstractApiVersionsRequestTest(cluster) {
-
-  val kafkaClientSaslMechanism = "PLAIN"
-  val kafkaServerSaslMechanisms = List("PLAIN")
-
   private var sasl: SaslSetup = _
 
   @BeforeEach
-  def setupSasl(config: ClusterConfig): Unit = {
+  def setupSasl(): Unit = {
     sasl = new SaslSetup() {}

Review Comment:
   Could we create `sasl` in initialization?



##########
core/src/test/java/kafka/test/ClusterConfig.java:
##########
@@ -67,13 +69,16 @@ public class ClusterConfig {
         this.listenerName = listenerName;
         this.trustStoreFile = trustStoreFile;
         this.metadataVersion = metadataVersion;
-        this.serverProperties = copyOf(serverProperties);
-        this.producerProperties = copyOf(producerProperties);
-        this.consumerProperties = copyOf(consumerProperties);
-        this.adminClientProperties = copyOf(adminClientProperties);
-        this.saslServerProperties = copyOf(saslServerProperties);
-        this.saslClientProperties = copyOf(saslClientProperties);
-        perBrokerOverrideProperties.forEach((brokerId, props) -> 
this.perBrokerOverrideProperties.put(brokerId, copyOf(props)));
+        this.serverProperties = Collections.unmodifiableMap(serverProperties);
+        this.producerProperties = 
Collections.unmodifiableMap(producerProperties);
+        this.consumerProperties = 
Collections.unmodifiableMap(consumerProperties);
+        this.adminClientProperties = 
Collections.unmodifiableMap(adminClientProperties);
+        this.saslServerProperties = 
Collections.unmodifiableMap(saslServerProperties);
+        this.saslClientProperties = 
Collections.unmodifiableMap(saslClientProperties);
+        this.perBrokerOverrideProperties = Collections.unmodifiableMap(

Review Comment:
   In order to keep consistency, please do deep copy in eight Builder or this 
constructor



##########
core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala:
##########
@@ -17,48 +17,78 @@
 
 package kafka.server
 
-import kafka.test.{ClusterConfig, ClusterInstance}
+import kafka.test.ClusterInstance
 import org.apache.kafka.common.message.ApiVersionsRequestData
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.ApiVersionsRequest
-import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, ClusterTests, Type}
 import kafka.test.junit.ClusterTestExtensions
 import org.apache.kafka.server.common.MetadataVersion
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.BeforeEach
 import org.junit.jupiter.api.extension.ExtendWith
 
 
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.ALL, brokers = 1)
+@ClusterTestDefaults(brokers = 1)
 class ApiVersionsRequestTest(cluster: ClusterInstance) extends 
AbstractApiVersionsRequestTest(cluster) {
 
-  @BeforeEach
-  def setup(config: ClusterConfig): Unit = {
-    super.brokerPropertyOverrides(config.serverProperties())
-  }
-
-  @ClusterTest(metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties 
= Array(
-    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
-    new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value 
= "true"),
+  @ClusterTests(Array(
+    new ClusterTest(clusterType = Type.ZK, metadataVersion = 
MetadataVersion.IBP_3_8_IV0, serverProperties = Array(

Review Comment:
   Could we simplify the annotation by allowing to define `ClusterTemplate` in 
`ClusterTests`?



##########
core/src/test/java/kafka/testkit/TestKitNodes.java:
##########
@@ -97,14 +96,32 @@ public Builder setBrokerNodes(int numBrokerNodes, int 
disksPerBroker) {
                 if (!brokerNodeBuilders.isEmpty()) {
                     nextId = brokerNodeBuilders.lastKey() + 1;
                 }
-                BrokerNode.Builder brokerNodeBuilder = new BrokerNode.Builder()
+                BrokerNode.Builder brokerNodeBuilder = BrokerNode.builder()
                         .setId(nextId)
                         .setNumLogDirectories(disksPerBroker);
                 brokerNodeBuilders.put(nextId, brokerNodeBuilder);
             }
             return this;
         }
 
+        /**
+         * Set per broker properties overrides, this setter must be invoked 
after setBrokerNodes which

Review Comment:
   The happen-before is anti-pattern in builder pattern. Could we merge 
`PerBrokerPropertiesOverrides` to `setNumControllerNodes/setNumBrokerNodes`? 
for example:
   
   ```java
   setBrokerNodes(int numBrokerNodes, int disksPerBroker, Function<Integer, 
Map<String, Object>> perBrokerConfigs)
   
   ```



##########
core/src/test/java/kafka/test/ClusterConfig.java:
##########
@@ -22,11 +22,13 @@
 import org.apache.kafka.server.common.MetadataVersion;
 
 import java.io.File;
+import java.util.AbstractMap;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Properties;
+import java.util.stream.Collectors;
 
 /**

Review Comment:
   please add comments to say this class is a immutable object



##########
core/src/test/java/kafka/testkit/TestKitNodes.java:
##########
@@ -97,14 +96,32 @@ public Builder setBrokerNodes(int numBrokerNodes, int 
disksPerBroker) {
                 if (!brokerNodeBuilders.isEmpty()) {
                     nextId = brokerNodeBuilders.lastKey() + 1;
                 }
-                BrokerNode.Builder brokerNodeBuilder = new BrokerNode.Builder()
+                BrokerNode.Builder brokerNodeBuilder = BrokerNode.builder()
                         .setId(nextId)
                         .setNumLogDirectories(disksPerBroker);
                 brokerNodeBuilders.put(nextId, brokerNodeBuilder);
             }
             return this;
         }
 
+        /**
+         * Set per broker properties overrides, this setter must be invoked 
after setBrokerNodes which
+         * setup broker id and broker builder.
+         * @param perBrokerPropertiesOverrides properties to override in each 
broker
+         * @return Builder
+         */
+        public Builder setPerBrokerPropertiesOverrides(Map<Integer, 
Map<String, String>> perBrokerPropertiesOverrides) {
+            perBrokerPropertiesOverrides.forEach((brokerId, properties) -> {
+                if (!brokerNodeBuilders.containsKey(brokerId)) {
+                    throw new RuntimeException("Broker id " + brokerId + " 
does not exist");
+                }
+                Map<String, String> propertiesOverride = new HashMap<>();
+                properties.forEach((key, value) -> 
propertiesOverride.put(key.toString(), value.toString()));

Review Comment:
   `toString` is redundant.



##########
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##########
@@ -435,15 +443,13 @@ class KRaftClusterTest {
 
   private def doOnStartedKafkaCluster(numControllerNodes: Int = 1,
                                       numBrokerNodes: Int,
-                                      brokerPropertyOverrides: (TestKitNodes, 
BrokerNode) => Map[String, String])
+                                      brokerPropertyOverrides: 
util.Map[Integer, util.Map[String, String]])
                                      (action: KafkaClusterTestKit => Unit): 
Unit = {
     val nodes = new TestKitNodes.Builder()

Review Comment:
   Could we change the style to `TestKitNodes.builder()` so as to unify the 
builder pattern in testing?



##########
core/src/test/java/kafka/testkit/BrokerNode.java:
##########
@@ -66,11 +74,27 @@ public Builder setNumLogDirectories(int numLogDirectories) {
             return this;
         }
 
-        public BrokerNode build(
-            String baseDirectory,
-            Uuid clusterId,
-            boolean combined
-        ) {
+        public Builder setClusterId(Uuid clusterId) {

Review Comment:
   The builder of `ClusterConfig` do not have `set` prefix. Could we unify the 
naming?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to