chia7712 commented on code in PR #16599: URL: https://github.com/apache/kafka/pull/16599#discussion_r1688198603
########## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java: ########## @@ -95,98 +88,60 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; -import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG; -import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG; -import static org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG; -import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG; /** - * Setup an embedded Kafka cluster with specified number of brokers and specified broker properties. To be used for - * integration tests. + * Setup an embedded Kafka KRaft cluster (using {@link kafka.testkit.KafkaClusterTestKit} internally) with the + * specified number of brokers and the specified broker properties. This can be used for integration tests and is + * typically used in conjunction with {@link EmbeddedConnectCluster}. Additional Kafka client properties can also be + * supplied if required. This class also provides various utility methods to easily create Kafka topics, produce data, + * consume data etc. */ public class EmbeddedKafkaCluster { private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class); - private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120); + private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120); - // Kafka Config - private final KafkaServer[] brokers; + private final KafkaClusterTestKit cluster; private final Properties brokerConfig; - private final Time time = Time.SYSTEM; - private final int[] currentBrokerPorts; - private final String[] currentBrokerLogDirs; - private final boolean hasListenerConfig; + private final Map<String, String> clientConfigs; - final Map<String, String> clientConfigs; - - private EmbeddedZookeeper zookeeper = null; - private ListenerName listenerName = new ListenerName("PLAINTEXT"); private KafkaProducer<byte[], byte[]> producer; - public EmbeddedKafkaCluster(final int numBrokers, - final Properties brokerConfig) { + public EmbeddedKafkaCluster(final int numBrokers, final Properties brokerConfig) { this(numBrokers, brokerConfig, Collections.emptyMap()); } public EmbeddedKafkaCluster(final int numBrokers, - final Properties brokerConfig, - final Map<String, String> clientConfigs) { - brokers = new KafkaServer[numBrokers]; - currentBrokerPorts = new int[numBrokers]; - currentBrokerLogDirs = new String[numBrokers]; - this.brokerConfig = brokerConfig; - // Since we support `stop` followed by `startOnlyKafkaOnSamePorts`, we track whether - // a listener config is defined during initialization in order to know if it's - // safe to override it - hasListenerConfig = brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG) != null; + final Properties brokerConfig, + final Map<String, String> clientConfigs) { + addDefaultBrokerPropsIfAbsent(brokerConfig, numBrokers); + try { + KafkaClusterTestKit.Builder clusterBuilder = new KafkaClusterTestKit.Builder( Review Comment: I just want to apply the new test infra to all modules. That can unify the code base and all modules can derive the benefits when we improve it. BTW, I don't want to block this PR, but it seems to me some tests in connect module can leverage the new test infra first. We can work on them concurrently :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org