Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]

2024-07-29 Thread via GitHub


C0urante merged PR #16599:
URL: https://github.com/apache/kafka/pull/16599


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]

2024-07-29 Thread via GitHub


C0urante commented on PR #16599:
URL: https://github.com/apache/kafka/pull/16599#issuecomment-2256133214

   Thanks @chia7712!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]

2024-07-26 Thread via GitHub


C0urante commented on PR #16599:
URL: https://github.com/apache/kafka/pull/16599#issuecomment-2253149294

   Think I found the root cause. Pushed a fix, will try to get three CI runs 
without a failure on that test case before merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]

2024-07-26 Thread via GitHub


C0urante commented on PR #16599:
URL: https://github.com/apache/kafka/pull/16599#issuecomment-2253087786

   I've noticed that the `IdentityReplicationIntegrationTest::testReplication` 
case has failed four times on this PR branch with the same cause:
   ```
   org.opentest4j.AssertionFailedError: topic config was not synced ==> 
expected:  but was: 
at 
app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplication(MirrorConnectorsIntegrationBaseTest.java:308)
at java.base@21.0.3/java.lang.reflect.Method.invoke(Method.java:580)
at java.base@21.0.3/java.util.ArrayList.forEach(ArrayList.java:1596)
at java.base@21.0.3/java.util.ArrayList.forEach(ArrayList.java:1596)
   ```
   AFAIK this exact failure hasn't been occurring on trunk. I'd like to 
investigate a little more before merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]

2024-07-26 Thread via GitHub


mimaison commented on code in PR #16599:
URL: https://github.com/apache/kafka/pull/16599#discussion_r1692881997


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##
@@ -199,149 +153,74 @@ private void doStart() {
 producer = new KafkaProducer<>(producerProps, new 
ByteArraySerializer(), new ByteArraySerializer());
 }
 
-public void stopOnlyKafka() {
-stop(false, false);
-}
-
-public void stop() {
-stop(true, true);
-}
-
-private void stop(boolean deleteLogDirs, boolean stopZK) {
-maybeShutDownProducer();
-triggerBrokerShutdown();
-awaitBrokerShutdown();
-
-if (deleteLogDirs)
-deleteLogDirs();
-
-if (stopZK)
-stopZK();
-}
-
-private void maybeShutDownProducer() {
-try {
-if (producer != null) {
-producer.close();
-}
-} catch (Exception e) {
-log.error("Could not shutdown producer ", e);
-throw new RuntimeException("Could not shutdown producer", e);
-}
-}
-
-private void triggerBrokerShutdown() {
-for (KafkaServer broker : brokers) {
-try {
-broker.shutdown();
-} catch (Throwable t) {
-String msg = String.format("Could not shutdown broker at %s", 
address(broker));
-log.error(msg, t);
-throw new RuntimeException(msg, t);
-}
-}
-}
-
-private void awaitBrokerShutdown() {
-for (KafkaServer broker : brokers) {
-try {
-broker.awaitShutdown();
-} catch (Throwable t) {
-String msg = String.format("Failed while awaiting shutdown of 
broker at %s", address(broker));
-log.error(msg, t);
-throw new RuntimeException(msg, t);
-}
-}
-}
-
-private void deleteLogDirs() {
-for (KafkaServer broker : brokers) {
-try {
-log.info("Cleaning up kafka log dirs at {}", 
broker.config().logDirs());
-CoreUtils.delete(broker.config().logDirs());
-} catch (Throwable t) {
-String msg = String.format("Could not clean up log dirs for 
broker at %s",
-address(broker));
-log.error(msg, t);
-throw new RuntimeException(msg, t);
-}
-}
-}
-
-private void stopZK() {
-try {
-zookeeper.shutdown();
-} catch (Throwable t) {
-String msg = String.format("Could not shutdown zookeeper at %s", 
zKConnectString());
-log.error(msg, t);
-throw new RuntimeException(msg, t);
-}
+/**
+ * Restarts the Kafka brokers. This can be called after {@link 
#stopTemporarily()}. Note that if the Kafka brokers
+ * need to be listening on the same ports as earlier, the {@link 
#brokerConfig} should contain the
+ * {@link SocketServerConfigs#LISTENERS_CONFIG} property and it should use 
a fixed non-zero free port. Also note that this is
+ * only possible when {@code numBrokers} is 1.
+ */
+public void restart() {
+cluster.brokers().values().forEach(BrokerServer::startup);
 }
 
-private static void putIfAbsent(final Properties props, final String 
propertyKey, final Object propertyValue) {
-if (!props.containsKey(propertyKey)) {
-props.put(propertyKey, propertyValue);
-}
+/**
+ * Stop the brokers (and controllers) in the Kafka cluster. This can be 
used to test Connect's functionality

Review Comment:
   That will teach me to trust method names! Thanks for the explanation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]

2024-07-25 Thread via GitHub


C0urante commented on code in PR #16599:
URL: https://github.com/apache/kafka/pull/16599#discussion_r1691585017


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##
@@ -95,98 +87,60 @@
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
-import static 
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG;
-import static 
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG;
-import static 
org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG;
-import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG;
 
 /**
- * Setup an embedded Kafka cluster with specified number of brokers and 
specified broker properties. To be used for
- * integration tests.
+ * Setup an embedded Kafka KRaft cluster (using {@link 
kafka.testkit.KafkaClusterTestKit} internally) with the
+ * specified number of brokers and the specified broker properties. This can 
be used for integration tests and is
+ * typically used in conjunction with {@link EmbeddedConnectCluster}. 
Additional Kafka client properties can also be
+ * supplied if required. This class also provides various utility methods to 
easily create Kafka topics, produce data,
+ * consume data etc.
  */
 public class EmbeddedKafkaCluster {
 
 private static final Logger log = 
LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
 
-private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120); 
+private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120);
 
-// Kafka Config
-private final KafkaServer[] brokers;
+private final KafkaClusterTestKit cluster;
 private final Properties brokerConfig;
-private final Time time = Time.SYSTEM;
-private final int[] currentBrokerPorts;
-private final String[] currentBrokerLogDirs;
-private final boolean hasListenerConfig;
+private final Map clientConfigs;
 
-final Map clientConfigs;
-
-private EmbeddedZookeeper zookeeper = null;
-private ListenerName listenerName = new ListenerName("PLAINTEXT");
 private KafkaProducer producer;
 
-public EmbeddedKafkaCluster(final int numBrokers,
-final Properties brokerConfig) {
+public EmbeddedKafkaCluster(final int numBrokers, final Properties 
brokerConfig) {
 this(numBrokers, brokerConfig, Collections.emptyMap());
 }
 
 public EmbeddedKafkaCluster(final int numBrokers,
-final Properties brokerConfig,
-final Map clientConfigs) {
-brokers = new KafkaServer[numBrokers];
-currentBrokerPorts = new int[numBrokers];
-currentBrokerLogDirs = new String[numBrokers];
-this.brokerConfig = brokerConfig;
-// Since we support `stop` followed by `startOnlyKafkaOnSamePorts`, we 
track whether
-// a listener config is defined during initialization in order to know 
if it's
-// safe to override it
-hasListenerConfig = 
brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG) != null;
+   final Properties brokerConfig,
+   final Map clientConfigs) {
+addDefaultBrokerPropsIfAbsent(brokerConfig, numBrokers);
+try {
+KafkaClusterTestKit.Builder clusterBuilder = new 
KafkaClusterTestKit.Builder(
+new TestKitNodes.Builder()
+.setCombined(true)
+.setNumBrokerNodes(numBrokers)
+.setNumControllerNodes(numBrokers)

Review Comment:
   Great call! I tested locally and fixing the number of controller nodes at 1 
seemed to reduce the runtime of the 
`InternalTopicsIntegrationTest::testCreateInternalTopicsWithDefaultSettings` 
case by about 6-7 seconds on my laptop. There aren't many other tests that use 
multiple brokers but I've hardcoded the number of controllers to 1 in my latest 
commit just to play it safe.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]

2024-07-25 Thread via GitHub


chia7712 commented on code in PR #16599:
URL: https://github.com/apache/kafka/pull/16599#discussion_r1691331640


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##
@@ -95,98 +87,60 @@
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
-import static 
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG;
-import static 
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG;
-import static 
org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG;
-import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG;
 
 /**
- * Setup an embedded Kafka cluster with specified number of brokers and 
specified broker properties. To be used for
- * integration tests.
+ * Setup an embedded Kafka KRaft cluster (using {@link 
kafka.testkit.KafkaClusterTestKit} internally) with the
+ * specified number of brokers and the specified broker properties. This can 
be used for integration tests and is
+ * typically used in conjunction with {@link EmbeddedConnectCluster}. 
Additional Kafka client properties can also be
+ * supplied if required. This class also provides various utility methods to 
easily create Kafka topics, produce data,
+ * consume data etc.
  */
 public class EmbeddedKafkaCluster {
 
 private static final Logger log = 
LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
 
-private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120); 
+private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120);
 
-// Kafka Config
-private final KafkaServer[] brokers;
+private final KafkaClusterTestKit cluster;
 private final Properties brokerConfig;
-private final Time time = Time.SYSTEM;
-private final int[] currentBrokerPorts;
-private final String[] currentBrokerLogDirs;
-private final boolean hasListenerConfig;
+private final Map clientConfigs;
 
-final Map clientConfigs;
-
-private EmbeddedZookeeper zookeeper = null;
-private ListenerName listenerName = new ListenerName("PLAINTEXT");
 private KafkaProducer producer;
 
-public EmbeddedKafkaCluster(final int numBrokers,
-final Properties brokerConfig) {
+public EmbeddedKafkaCluster(final int numBrokers, final Properties 
brokerConfig) {
 this(numBrokers, brokerConfig, Collections.emptyMap());
 }
 
 public EmbeddedKafkaCluster(final int numBrokers,
-final Properties brokerConfig,
-final Map clientConfigs) {
-brokers = new KafkaServer[numBrokers];
-currentBrokerPorts = new int[numBrokers];
-currentBrokerLogDirs = new String[numBrokers];
-this.brokerConfig = brokerConfig;
-// Since we support `stop` followed by `startOnlyKafkaOnSamePorts`, we 
track whether
-// a listener config is defined during initialization in order to know 
if it's
-// safe to override it
-hasListenerConfig = 
brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG) != null;
+   final Properties brokerConfig,
+   final Map clientConfigs) {
+addDefaultBrokerPropsIfAbsent(brokerConfig, numBrokers);
+try {
+KafkaClusterTestKit.Builder clusterBuilder = new 
KafkaClusterTestKit.Builder(
+new TestKitNodes.Builder()
+.setCombined(true)
+.setNumBrokerNodes(numBrokers)
+.setNumControllerNodes(numBrokers)

Review Comment:
   If the quorum controllers are black box to connect, maybe we can hard code 
the controllers to `3` (or `1` to reduce the resources in testing)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]

2024-07-25 Thread via GitHub


OmniaGM commented on code in PR #16599:
URL: https://github.com/apache/kafka/pull/16599#discussion_r1691268684


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##
@@ -95,98 +87,60 @@
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
-import static 
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG;
-import static 
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG;
-import static 
org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG;
-import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG;
 
 /**
- * Setup an embedded Kafka cluster with specified number of brokers and 
specified broker properties. To be used for
- * integration tests.
+ * Setup an embedded Kafka KRaft cluster (using {@link 
kafka.testkit.KafkaClusterTestKit} internally) with the
+ * specified number of brokers and the specified broker properties. This can 
be used for integration tests and is
+ * typically used in conjunction with {@link EmbeddedConnectCluster}. 
Additional Kafka client properties can also be
+ * supplied if required. This class also provides various utility methods to 
easily create Kafka topics, produce data,
+ * consume data etc.
  */
 public class EmbeddedKafkaCluster {
 
 private static final Logger log = 
LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
 
-private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120); 
+private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120);
 
-// Kafka Config
-private final KafkaServer[] brokers;
+private final KafkaClusterTestKit cluster;
 private final Properties brokerConfig;
-private final Time time = Time.SYSTEM;
-private final int[] currentBrokerPorts;
-private final String[] currentBrokerLogDirs;
-private final boolean hasListenerConfig;
+private final Map clientConfigs;
 
-final Map clientConfigs;
-
-private EmbeddedZookeeper zookeeper = null;
-private ListenerName listenerName = new ListenerName("PLAINTEXT");
 private KafkaProducer producer;
 
-public EmbeddedKafkaCluster(final int numBrokers,
-final Properties brokerConfig) {
+public EmbeddedKafkaCluster(final int numBrokers, final Properties 
brokerConfig) {
 this(numBrokers, brokerConfig, Collections.emptyMap());
 }
 
 public EmbeddedKafkaCluster(final int numBrokers,
-final Properties brokerConfig,
-final Map clientConfigs) {
-brokers = new KafkaServer[numBrokers];
-currentBrokerPorts = new int[numBrokers];
-currentBrokerLogDirs = new String[numBrokers];
-this.brokerConfig = brokerConfig;
-// Since we support `stop` followed by `startOnlyKafkaOnSamePorts`, we 
track whether
-// a listener config is defined during initialization in order to know 
if it's
-// safe to override it
-hasListenerConfig = 
brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG) != null;
+   final Properties brokerConfig,
+   final Map clientConfigs) {
+addDefaultBrokerPropsIfAbsent(brokerConfig, numBrokers);
+try {
+KafkaClusterTestKit.Builder clusterBuilder = new 
KafkaClusterTestKit.Builder(
+new TestKitNodes.Builder()
+.setCombined(true)
+.setNumBrokerNodes(numBrokers)
+.setNumControllerNodes(numBrokers)

Review Comment:
   Do we care about num of controller == num of brokers can't be just hardcoded 
to 1. Bit concerned about cases where some one set unsuitable number for 
`numBrokers` not knowing this will impact quorum as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]

2024-07-25 Thread via GitHub


OmniaGM commented on code in PR #16599:
URL: https://github.com/apache/kafka/pull/16599#discussion_r1691268684


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##
@@ -95,98 +87,60 @@
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
-import static 
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG;
-import static 
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG;
-import static 
org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG;
-import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG;
 
 /**
- * Setup an embedded Kafka cluster with specified number of brokers and 
specified broker properties. To be used for
- * integration tests.
+ * Setup an embedded Kafka KRaft cluster (using {@link 
kafka.testkit.KafkaClusterTestKit} internally) with the
+ * specified number of brokers and the specified broker properties. This can 
be used for integration tests and is
+ * typically used in conjunction with {@link EmbeddedConnectCluster}. 
Additional Kafka client properties can also be
+ * supplied if required. This class also provides various utility methods to 
easily create Kafka topics, produce data,
+ * consume data etc.
  */
 public class EmbeddedKafkaCluster {
 
 private static final Logger log = 
LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
 
-private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120); 
+private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120);
 
-// Kafka Config
-private final KafkaServer[] brokers;
+private final KafkaClusterTestKit cluster;
 private final Properties brokerConfig;
-private final Time time = Time.SYSTEM;
-private final int[] currentBrokerPorts;
-private final String[] currentBrokerLogDirs;
-private final boolean hasListenerConfig;
+private final Map clientConfigs;
 
-final Map clientConfigs;
-
-private EmbeddedZookeeper zookeeper = null;
-private ListenerName listenerName = new ListenerName("PLAINTEXT");
 private KafkaProducer producer;
 
-public EmbeddedKafkaCluster(final int numBrokers,
-final Properties brokerConfig) {
+public EmbeddedKafkaCluster(final int numBrokers, final Properties 
brokerConfig) {
 this(numBrokers, brokerConfig, Collections.emptyMap());
 }
 
 public EmbeddedKafkaCluster(final int numBrokers,
-final Properties brokerConfig,
-final Map clientConfigs) {
-brokers = new KafkaServer[numBrokers];
-currentBrokerPorts = new int[numBrokers];
-currentBrokerLogDirs = new String[numBrokers];
-this.brokerConfig = brokerConfig;
-// Since we support `stop` followed by `startOnlyKafkaOnSamePorts`, we 
track whether
-// a listener config is defined during initialization in order to know 
if it's
-// safe to override it
-hasListenerConfig = 
brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG) != null;
+   final Properties brokerConfig,
+   final Map clientConfigs) {
+addDefaultBrokerPropsIfAbsent(brokerConfig, numBrokers);
+try {
+KafkaClusterTestKit.Builder clusterBuilder = new 
KafkaClusterTestKit.Builder(
+new TestKitNodes.Builder()
+.setCombined(true)
+.setNumBrokerNodes(numBrokers)
+.setNumControllerNodes(numBrokers)

Review Comment:
   Do we care about num of controller == num of brokers can't be just hardcoded 
to 1. Bit concerned about cases where some one set unsuitable number not 
knowing this will impact quorum as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]

2024-07-25 Thread via GitHub


chia7712 commented on PR #16599:
URL: https://github.com/apache/kafka/pull/16599#issuecomment-2250034366

   @C0urante Could you please rebase code to include fix #16688 for one more CI?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]

2024-07-25 Thread via GitHub


chia7712 commented on PR #16599:
URL: https://github.com/apache/kafka/pull/16599#issuecomment-2249611573

   The failure will be fixed by #16688


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]

2024-07-24 Thread via GitHub


chia7712 commented on code in PR #16599:
URL: https://github.com/apache/kafka/pull/16599#discussion_r1690292626


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##
@@ -447,9 +326,9 @@ public void createTopic(String topic, int partitions, int 
replication, Map topicConfig, Map adminClientConfig) {
-if (replication > brokers.length) {
+if (replication > cluster.brokers().size()) {

Review Comment:
   > unless it would leave to a regression I'd prefer not to block this PR on 
figuring that out and save it for a follow-up. Is that alright?
   
   make senses to me :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]

2024-07-24 Thread via GitHub


C0urante commented on PR #16599:
URL: https://github.com/apache/kafka/pull/16599#issuecomment-2248666797

   Thanks @chia7712 for the review! Pushed a few commits that should hopefully 
address your comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]

2024-07-24 Thread via GitHub


C0urante commented on code in PR #16599:
URL: https://github.com/apache/kafka/pull/16599#discussion_r1690265976


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##
@@ -199,149 +153,74 @@ private void doStart() {
 producer = new KafkaProducer<>(producerProps, new 
ByteArraySerializer(), new ByteArraySerializer());
 }
 
-public void stopOnlyKafka() {
-stop(false, false);
-}
-
-public void stop() {
-stop(true, true);
-}
-
-private void stop(boolean deleteLogDirs, boolean stopZK) {
-maybeShutDownProducer();
-triggerBrokerShutdown();
-awaitBrokerShutdown();
-
-if (deleteLogDirs)
-deleteLogDirs();
-
-if (stopZK)
-stopZK();
-}
-
-private void maybeShutDownProducer() {
-try {
-if (producer != null) {
-producer.close();
-}
-} catch (Exception e) {
-log.error("Could not shutdown producer ", e);
-throw new RuntimeException("Could not shutdown producer", e);
-}
-}
-
-private void triggerBrokerShutdown() {
-for (KafkaServer broker : brokers) {
-try {
-broker.shutdown();
-} catch (Throwable t) {
-String msg = String.format("Could not shutdown broker at %s", 
address(broker));
-log.error(msg, t);
-throw new RuntimeException(msg, t);
-}
-}
-}
-
-private void awaitBrokerShutdown() {
-for (KafkaServer broker : brokers) {
-try {
-broker.awaitShutdown();
-} catch (Throwable t) {
-String msg = String.format("Failed while awaiting shutdown of 
broker at %s", address(broker));
-log.error(msg, t);
-throw new RuntimeException(msg, t);
-}
-}
-}
-
-private void deleteLogDirs() {
-for (KafkaServer broker : brokers) {
-try {
-log.info("Cleaning up kafka log dirs at {}", 
broker.config().logDirs());
-CoreUtils.delete(broker.config().logDirs());
-} catch (Throwable t) {
-String msg = String.format("Could not clean up log dirs for 
broker at %s",
-address(broker));
-log.error(msg, t);
-throw new RuntimeException(msg, t);
-}
-}
-}
-
-private void stopZK() {
-try {
-zookeeper.shutdown();
-} catch (Throwable t) {
-String msg = String.format("Could not shutdown zookeeper at %s", 
zKConnectString());
-log.error(msg, t);
-throw new RuntimeException(msg, t);
-}
+/**
+ * Restarts the Kafka brokers. This can be called after {@link 
#stopTemporarily()}. Note that if the Kafka brokers
+ * need to be listening on the same ports as earlier, the {@link 
#brokerConfig} should contain the
+ * {@link SocketServerConfigs#LISTENERS_CONFIG} property and it should use 
a fixed non-zero free port. Also note that this is
+ * only possible when {@code numBrokers} is 1.
+ */
+public void restart() {
+cluster.brokers().values().forEach(BrokerServer::startup);
 }
 
-private static void putIfAbsent(final Properties props, final String 
propertyKey, final Object propertyValue) {
-if (!props.containsKey(propertyKey)) {
-props.put(propertyKey, propertyValue);
-}
+/**
+ * Stop the brokers (and controllers) in the Kafka cluster. This can be 
used to test Connect's functionality

Review Comment:
   Ah, thanks for the clarification! I made this change in response to [a prior 
comment](https://github.com/apache/kafka/pull/16599#discussion_r1682557520) 
that had me thinking that running in combined mode meant it would be impossible 
to stop brokers separately from controllers. Given this new information I've 
reverted the commit I made in response to that comment.
   
   I think it should be enough (at least for the sake of this migration) to 
just run in combined mode. We could definitely add tests later on to verify 
controller connection.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]

2024-07-24 Thread via GitHub


C0urante commented on code in PR #16599:
URL: https://github.com/apache/kafka/pull/16599#discussion_r1690265815


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##
@@ -199,149 +153,74 @@ private void doStart() {
 producer = new KafkaProducer<>(producerProps, new 
ByteArraySerializer(), new ByteArraySerializer());
 }
 
-public void stopOnlyKafka() {
-stop(false, false);
-}
-
-public void stop() {
-stop(true, true);
-}
-
-private void stop(boolean deleteLogDirs, boolean stopZK) {
-maybeShutDownProducer();
-triggerBrokerShutdown();
-awaitBrokerShutdown();
-
-if (deleteLogDirs)
-deleteLogDirs();
-
-if (stopZK)
-stopZK();
-}
-
-private void maybeShutDownProducer() {
-try {
-if (producer != null) {
-producer.close();
-}
-} catch (Exception e) {
-log.error("Could not shutdown producer ", e);
-throw new RuntimeException("Could not shutdown producer", e);
-}
-}
-
-private void triggerBrokerShutdown() {
-for (KafkaServer broker : brokers) {
-try {
-broker.shutdown();
-} catch (Throwable t) {
-String msg = String.format("Could not shutdown broker at %s", 
address(broker));
-log.error(msg, t);
-throw new RuntimeException(msg, t);
-}
-}
-}
-
-private void awaitBrokerShutdown() {
-for (KafkaServer broker : brokers) {
-try {
-broker.awaitShutdown();
-} catch (Throwable t) {
-String msg = String.format("Failed while awaiting shutdown of 
broker at %s", address(broker));
-log.error(msg, t);
-throw new RuntimeException(msg, t);
-}
-}
-}
-
-private void deleteLogDirs() {
-for (KafkaServer broker : brokers) {
-try {
-log.info("Cleaning up kafka log dirs at {}", 
broker.config().logDirs());
-CoreUtils.delete(broker.config().logDirs());
-} catch (Throwable t) {
-String msg = String.format("Could not clean up log dirs for 
broker at %s",
-address(broker));
-log.error(msg, t);
-throw new RuntimeException(msg, t);
-}
-}
-}
-
-private void stopZK() {
-try {
-zookeeper.shutdown();
-} catch (Throwable t) {
-String msg = String.format("Could not shutdown zookeeper at %s", 
zKConnectString());
-log.error(msg, t);
-throw new RuntimeException(msg, t);
-}
+/**
+ * Restarts the Kafka brokers. This can be called after {@link 
#stopTemporarily()}. Note that if the Kafka brokers
+ * need to be listening on the same ports as earlier, the {@link 
#brokerConfig} should contain the
+ * {@link SocketServerConfigs#LISTENERS_CONFIG} property and it should use 
a fixed non-zero free port. Also note that this is
+ * only possible when {@code numBrokers} is 1.
+ */
+public void restart() {
+cluster.brokers().values().forEach(BrokerServer::startup);
 }
 
-private static void putIfAbsent(final Properties props, final String 
propertyKey, final Object propertyValue) {
-if (!props.containsKey(propertyKey)) {
-props.put(propertyKey, propertyValue);
-}
+/**
+ * Stop the brokers (and controllers) in the Kafka cluster. This can be 
used to test Connect's functionality
+ * when the backing Kafka cluster goes offline.
+ */
+public void stopTemporarily() {
+cluster.brokers().values().forEach(BrokerServer::shutdown);
+cluster.brokers().values().forEach(BrokerServer::awaitShutdown);
 }
 
-private String createLogDir() {
-try {
-return 
Files.createTempDirectory(getClass().getSimpleName()).toString();
-} catch (IOException e) {
-log.error("Unable to create temporary log directory", e);
-throw new ConnectException("Unable to create temporary log 
directory", e);
+public void stop() {
+AtomicReference shutdownFailure = new AtomicReference<>();
+Utils.closeQuietly(producer, "producer for embedded Kafka cluster", 
shutdownFailure);
+Utils.closeQuietly(cluster, "embedded Kafka cluster", shutdownFailure);
+if (shutdownFailure.get() != null) {
+throw new ConnectException("Failed to shut down producer / 
embedded Kafka cluster", shutdownFailure.get());
 }
 }
 
 public String bootstrapServers() {
-return Arrays.stream(brokers)
-.map(this::address)
-.collect(Collectors.joining(","));
-}
-
-public String address(KafkaServer server) {
-fina

Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]

2024-07-24 Thread via GitHub


chia7712 commented on code in PR #16599:
URL: https://github.com/apache/kafka/pull/16599#discussion_r1689474552


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##
@@ -447,9 +326,9 @@ public void createTopic(String topic, int partitions, int 
replication, Map topicConfig, Map adminClientConfig) {
-if (replication > brokers.length) {
+if (replication > cluster.brokers().size()) {

Review Comment:
   Is this a kind of fail fast? otherwise, the admin will get the same error 
from server.



##
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##
@@ -199,149 +153,74 @@ private void doStart() {
 producer = new KafkaProducer<>(producerProps, new 
ByteArraySerializer(), new ByteArraySerializer());
 }
 
-public void stopOnlyKafka() {
-stop(false, false);
-}
-
-public void stop() {
-stop(true, true);
-}
-
-private void stop(boolean deleteLogDirs, boolean stopZK) {
-maybeShutDownProducer();
-triggerBrokerShutdown();
-awaitBrokerShutdown();
-
-if (deleteLogDirs)
-deleteLogDirs();
-
-if (stopZK)
-stopZK();
-}
-
-private void maybeShutDownProducer() {
-try {
-if (producer != null) {
-producer.close();
-}
-} catch (Exception e) {
-log.error("Could not shutdown producer ", e);
-throw new RuntimeException("Could not shutdown producer", e);
-}
-}
-
-private void triggerBrokerShutdown() {
-for (KafkaServer broker : brokers) {
-try {
-broker.shutdown();
-} catch (Throwable t) {
-String msg = String.format("Could not shutdown broker at %s", 
address(broker));
-log.error(msg, t);
-throw new RuntimeException(msg, t);
-}
-}
-}
-
-private void awaitBrokerShutdown() {
-for (KafkaServer broker : brokers) {
-try {
-broker.awaitShutdown();
-} catch (Throwable t) {
-String msg = String.format("Failed while awaiting shutdown of 
broker at %s", address(broker));
-log.error(msg, t);
-throw new RuntimeException(msg, t);
-}
-}
-}
-
-private void deleteLogDirs() {
-for (KafkaServer broker : brokers) {
-try {
-log.info("Cleaning up kafka log dirs at {}", 
broker.config().logDirs());
-CoreUtils.delete(broker.config().logDirs());
-} catch (Throwable t) {
-String msg = String.format("Could not clean up log dirs for 
broker at %s",
-address(broker));
-log.error(msg, t);
-throw new RuntimeException(msg, t);
-}
-}
-}
-
-private void stopZK() {
-try {
-zookeeper.shutdown();
-} catch (Throwable t) {
-String msg = String.format("Could not shutdown zookeeper at %s", 
zKConnectString());
-log.error(msg, t);
-throw new RuntimeException(msg, t);
-}
+/**
+ * Restarts the Kafka brokers. This can be called after {@link 
#stopTemporarily()}. Note that if the Kafka brokers
+ * need to be listening on the same ports as earlier, the {@link 
#brokerConfig} should contain the
+ * {@link SocketServerConfigs#LISTENERS_CONFIG} property and it should use 
a fixed non-zero free port. Also note that this is
+ * only possible when {@code numBrokers} is 1.
+ */
+public void restart() {
+cluster.brokers().values().forEach(BrokerServer::startup);
 }
 
-private static void putIfAbsent(final Properties props, final String 
propertyKey, final Object propertyValue) {
-if (!props.containsKey(propertyKey)) {
-props.put(propertyKey, propertyValue);
-}
+/**
+ * Stop the brokers (and controllers) in the Kafka cluster. This can be 
used to test Connect's functionality
+ * when the backing Kafka cluster goes offline.
+ */
+public void stopTemporarily() {
+cluster.brokers().values().forEach(BrokerServer::shutdown);
+cluster.brokers().values().forEach(BrokerServer::awaitShutdown);
 }
 
-private String createLogDir() {
-try {
-return 
Files.createTempDirectory(getClass().getSimpleName()).toString();
-} catch (IOException e) {
-log.error("Unable to create temporary log directory", e);
-throw new ConnectException("Unable to create temporary log 
directory", e);
+public void stop() {
+AtomicReference shutdownFailure = new AtomicReference<>();
+Utils.closeQuietly(producer, "producer for embedded Kafka cluster", 
shutdownFailure);
+Utils.closeQuietly(cluster, "embedde

Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]

2024-07-23 Thread via GitHub


chia7712 commented on code in PR #16599:
URL: https://github.com/apache/kafka/pull/16599#discussion_r1689006610


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##
@@ -95,98 +88,60 @@
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
-import static 
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG;
-import static 
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG;
-import static 
org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG;
-import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG;
 
 /**
- * Setup an embedded Kafka cluster with specified number of brokers and 
specified broker properties. To be used for
- * integration tests.
+ * Setup an embedded Kafka KRaft cluster (using {@link 
kafka.testkit.KafkaClusterTestKit} internally) with the
+ * specified number of brokers and the specified broker properties. This can 
be used for integration tests and is
+ * typically used in conjunction with {@link EmbeddedConnectCluster}. 
Additional Kafka client properties can also be
+ * supplied if required. This class also provides various utility methods to 
easily create Kafka topics, produce data,
+ * consume data etc.
  */
 public class EmbeddedKafkaCluster {
 
 private static final Logger log = 
LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
 
-private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120); 
+private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120);
 
-// Kafka Config
-private final KafkaServer[] brokers;
+private final KafkaClusterTestKit cluster;
 private final Properties brokerConfig;
-private final Time time = Time.SYSTEM;
-private final int[] currentBrokerPorts;
-private final String[] currentBrokerLogDirs;
-private final boolean hasListenerConfig;
+private final Map clientConfigs;
 
-final Map clientConfigs;
-
-private EmbeddedZookeeper zookeeper = null;
-private ListenerName listenerName = new ListenerName("PLAINTEXT");
 private KafkaProducer producer;
 
-public EmbeddedKafkaCluster(final int numBrokers,
-final Properties brokerConfig) {
+public EmbeddedKafkaCluster(final int numBrokers, final Properties 
brokerConfig) {
 this(numBrokers, brokerConfig, Collections.emptyMap());
 }
 
 public EmbeddedKafkaCluster(final int numBrokers,
-final Properties brokerConfig,
-final Map clientConfigs) {
-brokers = new KafkaServer[numBrokers];
-currentBrokerPorts = new int[numBrokers];
-currentBrokerLogDirs = new String[numBrokers];
-this.brokerConfig = brokerConfig;
-// Since we support `stop` followed by `startOnlyKafkaOnSamePorts`, we 
track whether
-// a listener config is defined during initialization in order to know 
if it's
-// safe to override it
-hasListenerConfig = 
brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG) != null;
+   final Properties brokerConfig,
+   final Map clientConfigs) {
+addDefaultBrokerPropsIfAbsent(brokerConfig, numBrokers);
+try {
+KafkaClusterTestKit.Builder clusterBuilder = new 
KafkaClusterTestKit.Builder(

Review Comment:
   @C0urante Thanks for your response. I move the discussion to 
https://issues.apache.org/jira/browse/KAFKA-17174 to avoid unnecessary noise in 
this PR :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]

2024-07-23 Thread via GitHub


C0urante commented on PR #16599:
URL: https://github.com/apache/kafka/pull/16599#issuecomment-2245782479

   @mimaison It looks like my latest commits went through. Would you mind 
giving this another round when you have a chance? Happy to continue the 
conversation about the `ClusterTestExtensions` API too, but figure that that 
can take place concurrently with a review of the latest changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]

2024-07-23 Thread via GitHub


C0urante commented on code in PR #16599:
URL: https://github.com/apache/kafka/pull/16599#discussion_r1688407540


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##
@@ -95,98 +88,60 @@
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
-import static 
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG;
-import static 
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG;
-import static 
org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG;
-import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG;
 
 /**
- * Setup an embedded Kafka cluster with specified number of brokers and 
specified broker properties. To be used for
- * integration tests.
+ * Setup an embedded Kafka KRaft cluster (using {@link 
kafka.testkit.KafkaClusterTestKit} internally) with the
+ * specified number of brokers and the specified broker properties. This can 
be used for integration tests and is
+ * typically used in conjunction with {@link EmbeddedConnectCluster}. 
Additional Kafka client properties can also be
+ * supplied if required. This class also provides various utility methods to 
easily create Kafka topics, produce data,
+ * consume data etc.
  */
 public class EmbeddedKafkaCluster {
 
 private static final Logger log = 
LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
 
-private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120); 
+private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120);
 
-// Kafka Config
-private final KafkaServer[] brokers;
+private final KafkaClusterTestKit cluster;
 private final Properties brokerConfig;
-private final Time time = Time.SYSTEM;
-private final int[] currentBrokerPorts;
-private final String[] currentBrokerLogDirs;
-private final boolean hasListenerConfig;
+private final Map clientConfigs;
 
-final Map clientConfigs;
-
-private EmbeddedZookeeper zookeeper = null;
-private ListenerName listenerName = new ListenerName("PLAINTEXT");
 private KafkaProducer producer;
 
-public EmbeddedKafkaCluster(final int numBrokers,
-final Properties brokerConfig) {
+public EmbeddedKafkaCluster(final int numBrokers, final Properties 
brokerConfig) {
 this(numBrokers, brokerConfig, Collections.emptyMap());
 }
 
 public EmbeddedKafkaCluster(final int numBrokers,
-final Properties brokerConfig,
-final Map clientConfigs) {
-brokers = new KafkaServer[numBrokers];
-currentBrokerPorts = new int[numBrokers];
-currentBrokerLogDirs = new String[numBrokers];
-this.brokerConfig = brokerConfig;
-// Since we support `stop` followed by `startOnlyKafkaOnSamePorts`, we 
track whether
-// a listener config is defined during initialization in order to know 
if it's
-// safe to override it
-hasListenerConfig = 
brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG) != null;
+   final Properties brokerConfig,
+   final Map clientConfigs) {
+addDefaultBrokerPropsIfAbsent(brokerConfig, numBrokers);
+try {
+KafkaClusterTestKit.Builder clusterBuilder = new 
KafkaClusterTestKit.Builder(

Review Comment:
   I agree that a unified testing style will be better in the long run, but 
we're pretty far away from that with Connect's integration tests.
   
   For the vast majority of our tests, we can't easily switch over to the 
`ClusterTestExtensions` API because our `EmbeddedConnectCluster` automatically 
spins up its own `EmbeddedKafkaCluster`. If we used the annotation-based API, 
we'd have to modify our `EmbeddedConnectCluster` class to not do that and to 
instead accept a `ClusterInstance` that's been created and possibly started 
out-of-band, which would actually be less convenient than our current embedded 
cluster API and IMO is not a path we should pursue.
   
   I also don't think that adopting the `ClusterTestExtensions` API piecemeal 
in Connect (like the proposal in KAFKA-17174 to migrate the 
`OffsetsApiIntegrationTest`) without a long-term strategy to migrate our entire 
collection of integration tests is a good idea. Right now Connect uses a 
consistent style for our all of our integration tests and fragmenting that 
style will just make life harder for the people actually writing and 
maintaining these tests.
   
   TL;DR: While I'm also not opposed to adopting the `ClusterTestExtensions` 
API in Conne

Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]

2024-07-23 Thread via GitHub


C0urante commented on code in PR #16599:
URL: https://github.com/apache/kafka/pull/16599#discussion_r1688407540


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##
@@ -95,98 +88,60 @@
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
-import static 
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG;
-import static 
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG;
-import static 
org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG;
-import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG;
 
 /**
- * Setup an embedded Kafka cluster with specified number of brokers and 
specified broker properties. To be used for
- * integration tests.
+ * Setup an embedded Kafka KRaft cluster (using {@link 
kafka.testkit.KafkaClusterTestKit} internally) with the
+ * specified number of brokers and the specified broker properties. This can 
be used for integration tests and is
+ * typically used in conjunction with {@link EmbeddedConnectCluster}. 
Additional Kafka client properties can also be
+ * supplied if required. This class also provides various utility methods to 
easily create Kafka topics, produce data,
+ * consume data etc.
  */
 public class EmbeddedKafkaCluster {
 
 private static final Logger log = 
LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
 
-private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120); 
+private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120);
 
-// Kafka Config
-private final KafkaServer[] brokers;
+private final KafkaClusterTestKit cluster;
 private final Properties brokerConfig;
-private final Time time = Time.SYSTEM;
-private final int[] currentBrokerPorts;
-private final String[] currentBrokerLogDirs;
-private final boolean hasListenerConfig;
+private final Map clientConfigs;
 
-final Map clientConfigs;
-
-private EmbeddedZookeeper zookeeper = null;
-private ListenerName listenerName = new ListenerName("PLAINTEXT");
 private KafkaProducer producer;
 
-public EmbeddedKafkaCluster(final int numBrokers,
-final Properties brokerConfig) {
+public EmbeddedKafkaCluster(final int numBrokers, final Properties 
brokerConfig) {
 this(numBrokers, brokerConfig, Collections.emptyMap());
 }
 
 public EmbeddedKafkaCluster(final int numBrokers,
-final Properties brokerConfig,
-final Map clientConfigs) {
-brokers = new KafkaServer[numBrokers];
-currentBrokerPorts = new int[numBrokers];
-currentBrokerLogDirs = new String[numBrokers];
-this.brokerConfig = brokerConfig;
-// Since we support `stop` followed by `startOnlyKafkaOnSamePorts`, we 
track whether
-// a listener config is defined during initialization in order to know 
if it's
-// safe to override it
-hasListenerConfig = 
brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG) != null;
+   final Properties brokerConfig,
+   final Map clientConfigs) {
+addDefaultBrokerPropsIfAbsent(brokerConfig, numBrokers);
+try {
+KafkaClusterTestKit.Builder clusterBuilder = new 
KafkaClusterTestKit.Builder(

Review Comment:
   I agree that a unified testing style will be better in the long run, but 
we're pretty far away from that with Connect's integration tests.
   
   For the vast majority of our tests, we can't easily switch over to the 
`ClusterTestExtensions` API because our `EmbeddedConnectCluster` automatically 
spins up its own `EmbeddedKafkaCluster`. If we used the annotation-based API, 
we'd have to modify our `EmbeddedConnectCluster` class to not create its own 
embedded Kafka cluster and to instead accept a `ClusterInstance` that's been 
created and possibly started out-of-band, which would actually be less 
convenient than our current embedded cluster API and IMO is not a path we 
should pursue.
   
   I also don't think that adopting the `ClusterTestExtensions` API piecemeal 
in Connect (like the proposal in KAFKA-17174 to migrate the 
`OffsetsApiIntegrationTest`) without a long-term strategy to migrate our entire 
collection of integration tests is a good idea. Right now Connect uses a 
consistent style for our all of our integration tests and fragmenting that 
style will just make life harder for the people actually writing and 
maintaining these tests.
   
   TL;DR: While I'm also not opposed to adopting the `Clust

Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]

2024-07-23 Thread via GitHub


chia7712 commented on code in PR #16599:
URL: https://github.com/apache/kafka/pull/16599#discussion_r1688198603


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##
@@ -95,98 +88,60 @@
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
-import static 
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG;
-import static 
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG;
-import static 
org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG;
-import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG;
 
 /**
- * Setup an embedded Kafka cluster with specified number of brokers and 
specified broker properties. To be used for
- * integration tests.
+ * Setup an embedded Kafka KRaft cluster (using {@link 
kafka.testkit.KafkaClusterTestKit} internally) with the
+ * specified number of brokers and the specified broker properties. This can 
be used for integration tests and is
+ * typically used in conjunction with {@link EmbeddedConnectCluster}. 
Additional Kafka client properties can also be
+ * supplied if required. This class also provides various utility methods to 
easily create Kafka topics, produce data,
+ * consume data etc.
  */
 public class EmbeddedKafkaCluster {
 
 private static final Logger log = 
LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
 
-private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120); 
+private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120);
 
-// Kafka Config
-private final KafkaServer[] brokers;
+private final KafkaClusterTestKit cluster;
 private final Properties brokerConfig;
-private final Time time = Time.SYSTEM;
-private final int[] currentBrokerPorts;
-private final String[] currentBrokerLogDirs;
-private final boolean hasListenerConfig;
+private final Map clientConfigs;
 
-final Map clientConfigs;
-
-private EmbeddedZookeeper zookeeper = null;
-private ListenerName listenerName = new ListenerName("PLAINTEXT");
 private KafkaProducer producer;
 
-public EmbeddedKafkaCluster(final int numBrokers,
-final Properties brokerConfig) {
+public EmbeddedKafkaCluster(final int numBrokers, final Properties 
brokerConfig) {
 this(numBrokers, brokerConfig, Collections.emptyMap());
 }
 
 public EmbeddedKafkaCluster(final int numBrokers,
-final Properties brokerConfig,
-final Map clientConfigs) {
-brokers = new KafkaServer[numBrokers];
-currentBrokerPorts = new int[numBrokers];
-currentBrokerLogDirs = new String[numBrokers];
-this.brokerConfig = brokerConfig;
-// Since we support `stop` followed by `startOnlyKafkaOnSamePorts`, we 
track whether
-// a listener config is defined during initialization in order to know 
if it's
-// safe to override it
-hasListenerConfig = 
brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG) != null;
+   final Properties brokerConfig,
+   final Map clientConfigs) {
+addDefaultBrokerPropsIfAbsent(brokerConfig, numBrokers);
+try {
+KafkaClusterTestKit.Builder clusterBuilder = new 
KafkaClusterTestKit.Builder(

Review Comment:
   I just want to apply the new test infra to all modules. That can unify the 
code base and all modules can derive the benefits when we improve it.
   
   BTW, I don't want to block this PR, but it seems to me some tests in connect 
module can leverage the new test infra first. We can work on them concurrently 
:)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]

2024-07-23 Thread via GitHub


mimaison commented on code in PR #16599:
URL: https://github.com/apache/kafka/pull/16599#discussion_r1687978245


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##
@@ -95,98 +88,60 @@
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
-import static 
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG;
-import static 
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG;
-import static 
org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG;
-import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG;
 
 /**
- * Setup an embedded Kafka cluster with specified number of brokers and 
specified broker properties. To be used for
- * integration tests.
+ * Setup an embedded Kafka KRaft cluster (using {@link 
kafka.testkit.KafkaClusterTestKit} internally) with the
+ * specified number of brokers and the specified broker properties. This can 
be used for integration tests and is
+ * typically used in conjunction with {@link EmbeddedConnectCluster}. 
Additional Kafka client properties can also be
+ * supplied if required. This class also provides various utility methods to 
easily create Kafka topics, produce data,
+ * consume data etc.
  */
 public class EmbeddedKafkaCluster {
 
 private static final Logger log = 
LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
 
-private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120); 
+private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120);
 
-// Kafka Config
-private final KafkaServer[] brokers;
+private final KafkaClusterTestKit cluster;
 private final Properties brokerConfig;
-private final Time time = Time.SYSTEM;
-private final int[] currentBrokerPorts;
-private final String[] currentBrokerLogDirs;
-private final boolean hasListenerConfig;
+private final Map clientConfigs;
 
-final Map clientConfigs;
-
-private EmbeddedZookeeper zookeeper = null;
-private ListenerName listenerName = new ListenerName("PLAINTEXT");
 private KafkaProducer producer;
 
-public EmbeddedKafkaCluster(final int numBrokers,
-final Properties brokerConfig) {
+public EmbeddedKafkaCluster(final int numBrokers, final Properties 
brokerConfig) {
 this(numBrokers, brokerConfig, Collections.emptyMap());
 }
 
 public EmbeddedKafkaCluster(final int numBrokers,
-final Properties brokerConfig,
-final Map clientConfigs) {
-brokers = new KafkaServer[numBrokers];
-currentBrokerPorts = new int[numBrokers];
-currentBrokerLogDirs = new String[numBrokers];
-this.brokerConfig = brokerConfig;
-// Since we support `stop` followed by `startOnlyKafkaOnSamePorts`, we 
track whether
-// a listener config is defined during initialization in order to know 
if it's
-// safe to override it
-hasListenerConfig = 
brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG) != null;
+   final Properties brokerConfig,
+   final Map clientConfigs) {
+addDefaultBrokerPropsIfAbsent(brokerConfig, numBrokers);
+try {
+KafkaClusterTestKit.Builder clusterBuilder = new 
KafkaClusterTestKit.Builder(

Review Comment:
   Thanks for the heads up! There are so many refactorings in progress, it's 
hard to keep up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]

2024-07-23 Thread via GitHub


chia7712 commented on code in PR #16599:
URL: https://github.com/apache/kafka/pull/16599#discussion_r1687899344


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##
@@ -95,98 +88,60 @@
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
-import static 
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG;
-import static 
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG;
-import static 
org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG;
-import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG;
 
 /**
- * Setup an embedded Kafka cluster with specified number of brokers and 
specified broker properties. To be used for
- * integration tests.
+ * Setup an embedded Kafka KRaft cluster (using {@link 
kafka.testkit.KafkaClusterTestKit} internally) with the
+ * specified number of brokers and the specified broker properties. This can 
be used for integration tests and is
+ * typically used in conjunction with {@link EmbeddedConnectCluster}. 
Additional Kafka client properties can also be
+ * supplied if required. This class also provides various utility methods to 
easily create Kafka topics, produce data,
+ * consume data etc.
  */
 public class EmbeddedKafkaCluster {
 
 private static final Logger log = 
LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
 
-private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120); 
+private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120);
 
-// Kafka Config
-private final KafkaServer[] brokers;
+private final KafkaClusterTestKit cluster;
 private final Properties brokerConfig;
-private final Time time = Time.SYSTEM;
-private final int[] currentBrokerPorts;
-private final String[] currentBrokerLogDirs;
-private final boolean hasListenerConfig;
+private final Map clientConfigs;
 
-final Map clientConfigs;
-
-private EmbeddedZookeeper zookeeper = null;
-private ListenerName listenerName = new ListenerName("PLAINTEXT");
 private KafkaProducer producer;
 
-public EmbeddedKafkaCluster(final int numBrokers,
-final Properties brokerConfig) {
+public EmbeddedKafkaCluster(final int numBrokers, final Properties 
brokerConfig) {
 this(numBrokers, brokerConfig, Collections.emptyMap());
 }
 
 public EmbeddedKafkaCluster(final int numBrokers,
-final Properties brokerConfig,
-final Map clientConfigs) {
-brokers = new KafkaServer[numBrokers];
-currentBrokerPorts = new int[numBrokers];
-currentBrokerLogDirs = new String[numBrokers];
-this.brokerConfig = brokerConfig;
-// Since we support `stop` followed by `startOnlyKafkaOnSamePorts`, we 
track whether
-// a listener config is defined during initialization in order to know 
if it's
-// safe to override it
-hasListenerConfig = 
brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG) != null;
+   final Properties brokerConfig,
+   final Map clientConfigs) {
+addDefaultBrokerPropsIfAbsent(brokerConfig, numBrokers);
+try {
+KafkaClusterTestKit.Builder clusterBuilder = new 
KafkaClusterTestKit.Builder(

Review Comment:
   For another benefit: #16499 introduces thread-leak detection for method 
level, and it is applied automatically if we adopt `ClusterTestExtensions` 
:smile: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]

2024-07-21 Thread via GitHub


chia7712 commented on code in PR #16599:
URL: https://github.com/apache/kafka/pull/16599#discussion_r1685882826


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##
@@ -95,98 +88,60 @@
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
-import static 
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG;
-import static 
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG;
-import static 
org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG;
-import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG;
 
 /**
- * Setup an embedded Kafka cluster with specified number of brokers and 
specified broker properties. To be used for
- * integration tests.
+ * Setup an embedded Kafka KRaft cluster (using {@link 
kafka.testkit.KafkaClusterTestKit} internally) with the
+ * specified number of brokers and the specified broker properties. This can 
be used for integration tests and is
+ * typically used in conjunction with {@link EmbeddedConnectCluster}. 
Additional Kafka client properties can also be
+ * supplied if required. This class also provides various utility methods to 
easily create Kafka topics, produce data,
+ * consume data etc.
  */
 public class EmbeddedKafkaCluster {
 
 private static final Logger log = 
LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
 
-private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120); 
+private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120);
 
-// Kafka Config
-private final KafkaServer[] brokers;
+private final KafkaClusterTestKit cluster;
 private final Properties brokerConfig;
-private final Time time = Time.SYSTEM;
-private final int[] currentBrokerPorts;
-private final String[] currentBrokerLogDirs;
-private final boolean hasListenerConfig;
+private final Map clientConfigs;
 
-final Map clientConfigs;
-
-private EmbeddedZookeeper zookeeper = null;
-private ListenerName listenerName = new ListenerName("PLAINTEXT");
 private KafkaProducer producer;
 
-public EmbeddedKafkaCluster(final int numBrokers,
-final Properties brokerConfig) {
+public EmbeddedKafkaCluster(final int numBrokers, final Properties 
brokerConfig) {
 this(numBrokers, brokerConfig, Collections.emptyMap());
 }
 
 public EmbeddedKafkaCluster(final int numBrokers,
-final Properties brokerConfig,
-final Map clientConfigs) {
-brokers = new KafkaServer[numBrokers];
-currentBrokerPorts = new int[numBrokers];
-currentBrokerLogDirs = new String[numBrokers];
-this.brokerConfig = brokerConfig;
-// Since we support `stop` followed by `startOnlyKafkaOnSamePorts`, we 
track whether
-// a listener config is defined during initialization in order to know 
if it's
-// safe to override it
-hasListenerConfig = 
brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG) != null;
+   final Properties brokerConfig,
+   final Map clientConfigs) {
+addDefaultBrokerPropsIfAbsent(brokerConfig, numBrokers);
+try {
+KafkaClusterTestKit.Builder clusterBuilder = new 
KafkaClusterTestKit.Builder(

Review Comment:
   I file https://issues.apache.org/jira/browse/KAFKA-17174 to address above 
comments. Please feel free to raise objection. thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]

2024-07-21 Thread via GitHub


chia7712 commented on code in PR #16599:
URL: https://github.com/apache/kafka/pull/16599#discussion_r1685723572


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##
@@ -95,98 +88,60 @@
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
-import static 
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG;
-import static 
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG;
-import static 
org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG;
-import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG;
 
 /**
- * Setup an embedded Kafka cluster with specified number of brokers and 
specified broker properties. To be used for
- * integration tests.
+ * Setup an embedded Kafka KRaft cluster (using {@link 
kafka.testkit.KafkaClusterTestKit} internally) with the
+ * specified number of brokers and the specified broker properties. This can 
be used for integration tests and is
+ * typically used in conjunction with {@link EmbeddedConnectCluster}. 
Additional Kafka client properties can also be
+ * supplied if required. This class also provides various utility methods to 
easily create Kafka topics, produce data,
+ * consume data etc.
  */
 public class EmbeddedKafkaCluster {
 
 private static final Logger log = 
LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
 
-private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120); 
+private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120);
 
-// Kafka Config
-private final KafkaServer[] brokers;
+private final KafkaClusterTestKit cluster;
 private final Properties brokerConfig;
-private final Time time = Time.SYSTEM;
-private final int[] currentBrokerPorts;
-private final String[] currentBrokerLogDirs;
-private final boolean hasListenerConfig;
+private final Map clientConfigs;
 
-final Map clientConfigs;
-
-private EmbeddedZookeeper zookeeper = null;
-private ListenerName listenerName = new ListenerName("PLAINTEXT");
 private KafkaProducer producer;
 
-public EmbeddedKafkaCluster(final int numBrokers,
-final Properties brokerConfig) {
+public EmbeddedKafkaCluster(final int numBrokers, final Properties 
brokerConfig) {
 this(numBrokers, brokerConfig, Collections.emptyMap());
 }
 
 public EmbeddedKafkaCluster(final int numBrokers,
-final Properties brokerConfig,
-final Map clientConfigs) {
-brokers = new KafkaServer[numBrokers];
-currentBrokerPorts = new int[numBrokers];
-currentBrokerLogDirs = new String[numBrokers];
-this.brokerConfig = brokerConfig;
-// Since we support `stop` followed by `startOnlyKafkaOnSamePorts`, we 
track whether
-// a listener config is defined during initialization in order to know 
if it's
-// safe to override it
-hasListenerConfig = 
brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG) != null;
+   final Properties brokerConfig,
+   final Map clientConfigs) {
+addDefaultBrokerPropsIfAbsent(brokerConfig, numBrokers);
+try {
+KafkaClusterTestKit.Builder clusterBuilder = new 
KafkaClusterTestKit.Builder(

Review Comment:
   Or we can migrate the tests having few uses of `EmbeddedKafkaCluster` to 
`ClusterInstance`. For instance: `OffsetsApiIntegrationTest`. The other tests 
sticking on `EmbeddedKafkaCluster` can be migrated later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]

2024-07-21 Thread via GitHub


chia7712 commented on code in PR #16599:
URL: https://github.com/apache/kafka/pull/16599#discussion_r1685709519


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##
@@ -95,98 +88,60 @@
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
-import static 
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG;
-import static 
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG;
-import static 
org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG;
-import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG;
 
 /**
- * Setup an embedded Kafka cluster with specified number of brokers and 
specified broker properties. To be used for
- * integration tests.
+ * Setup an embedded Kafka KRaft cluster (using {@link 
kafka.testkit.KafkaClusterTestKit} internally) with the
+ * specified number of brokers and the specified broker properties. This can 
be used for integration tests and is
+ * typically used in conjunction with {@link EmbeddedConnectCluster}. 
Additional Kafka client properties can also be
+ * supplied if required. This class also provides various utility methods to 
easily create Kafka topics, produce data,
+ * consume data etc.
  */
 public class EmbeddedKafkaCluster {
 
 private static final Logger log = 
LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
 
-private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120); 
+private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120);
 
-// Kafka Config
-private final KafkaServer[] brokers;
+private final KafkaClusterTestKit cluster;
 private final Properties brokerConfig;
-private final Time time = Time.SYSTEM;
-private final int[] currentBrokerPorts;
-private final String[] currentBrokerLogDirs;
-private final boolean hasListenerConfig;
+private final Map clientConfigs;
 
-final Map clientConfigs;
-
-private EmbeddedZookeeper zookeeper = null;
-private ListenerName listenerName = new ListenerName("PLAINTEXT");
 private KafkaProducer producer;
 
-public EmbeddedKafkaCluster(final int numBrokers,
-final Properties brokerConfig) {
+public EmbeddedKafkaCluster(final int numBrokers, final Properties 
brokerConfig) {
 this(numBrokers, brokerConfig, Collections.emptyMap());
 }
 
 public EmbeddedKafkaCluster(final int numBrokers,
-final Properties brokerConfig,
-final Map clientConfigs) {
-brokers = new KafkaServer[numBrokers];
-currentBrokerPorts = new int[numBrokers];
-currentBrokerLogDirs = new String[numBrokers];
-this.brokerConfig = brokerConfig;
-// Since we support `stop` followed by `startOnlyKafkaOnSamePorts`, we 
track whether
-// a listener config is defined during initialization in order to know 
if it's
-// safe to override it
-hasListenerConfig = 
brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG) != null;
+   final Properties brokerConfig,
+   final Map clientConfigs) {
+addDefaultBrokerPropsIfAbsent(brokerConfig, numBrokers);
+try {
+KafkaClusterTestKit.Builder clusterBuilder = new 
KafkaClusterTestKit.Builder(

Review Comment:
   Is it possible to leverage `ClusterInstance` instead of using 
`KafkaClusterTestKit` directly? We had introduced the new test infra to some 
storage tests 
(https://github.com/apache/kafka/blob/defcbb51ee98ae766e4fcf41ed7631ef1b0cdd15/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java#L50)
   
   Leveraging the `ClusterInstance` can have following benefits:
   
   1. unify the test style
   2. easy to test for all types (zk, kraft, co-kraft)
   3. easy to migrate



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]

2024-07-19 Thread via GitHub


C0urante commented on PR #16599:
URL: https://github.com/apache/kafka/pull/16599#issuecomment-2239959002

   Well, I tried to push a new commit at least. Looks like airplane wifi is 
strong enough to publish GitHub comments but not enough to push commits. Will 
keep retrying until the commit lands 🥴


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]

2024-07-19 Thread via GitHub


C0urante commented on PR #16599:
URL: https://github.com/apache/kafka/pull/16599#issuecomment-2239932531

   Thanks Mickael! Pushed a new commit, ready for another round when you have a 
moment.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]

2024-07-19 Thread via GitHub


C0urante commented on code in PR #16599:
URL: https://github.com/apache/kafka/pull/16599#discussion_r1684858189


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java:
##
@@ -694,12 +701,29 @@ public void testTasksFailOnInabilityToFence() throws 
Exception {
 )).all().get();
 }
 
-StartAndStopLatch connectorStart = connectorAndTaskStart(tasksMax);
-
 log.info("Bringing up connector with fresh slate; fencing should not 
be necessary");
 connect.configureConnector(CONNECTOR_NAME, props);
-assertConnectorStarted(connectorStart);
-// Verify that the connector and its tasks have been able to start 
successfully
+
+// Hack: There is a small chance that our recent ACL updates for the 
connector have
+// not yet been propagated across the entire Kafka cluster, and that 
our connector
+// will fail on startup when it tries to list the end offsets of the 
worker's offsets topic
+// So, we implement some retry logic here to add a layer of resiliency 
in that case
+waitForCondition(
+() -> {
+ConnectorStateInfo status = 
connect.connectorStatus(CONNECTOR_NAME);
+if ("RUNNING".equals(status.connector().state())) {
+return true;
+} else if ("FAILED".equals(status.connector().state())) {
+log.debug("Restarting failed connector {}", 
CONNECTOR_NAME);
+connect.restartConnector(CONNECTOR_NAME);
+}
+return false;
+},
+30_000,

Review Comment:
   They're the same value, but we're retrying for slightly different reasons; 
delays in ACL propagation won't necessarily be caught during pre-flight 
connector config validation.
   
   I've pulled this out into a separate `ACL_PROPAGATION_TIMEOUT_MS` value; 
LMKWYT.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]

2024-07-19 Thread via GitHub


C0urante commented on code in PR #16599:
URL: https://github.com/apache/kafka/pull/16599#discussion_r1684857716


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##
@@ -95,98 +88,60 @@
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
-import static 
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG;
-import static 
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG;
-import static 
org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG;
-import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG;
 
 /**
- * Setup an embedded Kafka cluster with specified number of brokers and 
specified broker properties. To be used for
- * integration tests.
+ * Setup an embedded Kafka KRaft cluster (using {@link 
kafka.testkit.KafkaClusterTestKit} internally) with the
+ * specified number of brokers and the specified broker properties. This can 
be used for integration tests and is
+ * typically used in conjunction with {@link EmbeddedConnectCluster}. 
Additional Kafka client properties can also be
+ * supplied if required. This class also provides various utility methods to 
easily create Kafka topics, produce data,
+ * consume data etc.
  */
 public class EmbeddedKafkaCluster {
 
 private static final Logger log = 
LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
 
-private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120); 
+private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120);
 
-// Kafka Config
-private final KafkaServer[] brokers;
+private final KafkaClusterTestKit cluster;
 private final Properties brokerConfig;
-private final Time time = Time.SYSTEM;
-private final int[] currentBrokerPorts;
-private final String[] currentBrokerLogDirs;
-private final boolean hasListenerConfig;
+private final Map clientConfigs;
 
-final Map clientConfigs;
-
-private EmbeddedZookeeper zookeeper = null;
-private ListenerName listenerName = new ListenerName("PLAINTEXT");
 private KafkaProducer producer;
 
-public EmbeddedKafkaCluster(final int numBrokers,
-final Properties brokerConfig) {
+public EmbeddedKafkaCluster(final int numBrokers, final Properties 
brokerConfig) {
 this(numBrokers, brokerConfig, Collections.emptyMap());
 }
 
 public EmbeddedKafkaCluster(final int numBrokers,
-final Properties brokerConfig,
-final Map clientConfigs) {
-brokers = new KafkaServer[numBrokers];
-currentBrokerPorts = new int[numBrokers];
-currentBrokerLogDirs = new String[numBrokers];
-this.brokerConfig = brokerConfig;
-// Since we support `stop` followed by `startOnlyKafkaOnSamePorts`, we 
track whether
-// a listener config is defined during initialization in order to know 
if it's
-// safe to override it
-hasListenerConfig = 
brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG) != null;
+   final Properties brokerConfig,
+   final Map clientConfigs) {
+addDefaultBrokerPropsIfAbsent(brokerConfig, numBrokers);
+try {
+KafkaClusterTestKit.Builder clusterBuilder = new 
KafkaClusterTestKit.Builder(
+new TestKitNodes.Builder()
+.setCombined(true)

Review Comment:
   Ah, interesting! I think it's possible to use a separate controller 
node/cluster but it's unnecessary for the purposes of our tests. IMO we should 
instead just rename the `stopOnlyBrokers` and `restartOnlyBrokers` methods to 
`stop` and `restart`, respectively.
   
   For the Connect integration tests, we largely treat Kafka like a black box 
that's usually on and, for some scenarios, may be turned off (or generally 
unavailable). FWICT we don't really do anything that requires the kind of 
granularity where we'd distinguish between brokers being unavailable and 
controllers being unavailable.
   
   It's also a little tricky to use separate controllers and brokers because we 
still have to be able to turn our embedded Kafka cluster off and back on again 
with the same port, which becomes difficult when you have to specify a fixed 
port to use across restarts for brokers only. You end up having to abuse the 
`setPerServerProperties` API to accomplish that, and while it's certainly 
possible, it is a bit ugly to expose that cleanly in our internal 
`EmbeddedKafkaCluster` API.
   
   Thoughts?



##
connect/

Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]

2024-07-18 Thread via GitHub


mimaison commented on code in PR #16599:
URL: https://github.com/apache/kafka/pull/16599#discussion_r1682557520


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##
@@ -95,98 +88,60 @@
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
-import static 
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG;
-import static 
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG;
-import static 
org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG;
-import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG;
 
 /**
- * Setup an embedded Kafka cluster with specified number of brokers and 
specified broker properties. To be used for
- * integration tests.
+ * Setup an embedded Kafka KRaft cluster (using {@link 
kafka.testkit.KafkaClusterTestKit} internally) with the
+ * specified number of brokers and the specified broker properties. This can 
be used for integration tests and is
+ * typically used in conjunction with {@link EmbeddedConnectCluster}. 
Additional Kafka client properties can also be
+ * supplied if required. This class also provides various utility methods to 
easily create Kafka topics, produce data,
+ * consume data etc.
  */
 public class EmbeddedKafkaCluster {
 
 private static final Logger log = 
LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
 
-private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120); 
+private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120);
 
-// Kafka Config
-private final KafkaServer[] brokers;
+private final KafkaClusterTestKit cluster;
 private final Properties brokerConfig;
-private final Time time = Time.SYSTEM;
-private final int[] currentBrokerPorts;
-private final String[] currentBrokerLogDirs;
-private final boolean hasListenerConfig;
+private final Map clientConfigs;
 
-final Map clientConfigs;
-
-private EmbeddedZookeeper zookeeper = null;
-private ListenerName listenerName = new ListenerName("PLAINTEXT");
 private KafkaProducer producer;
 
-public EmbeddedKafkaCluster(final int numBrokers,
-final Properties brokerConfig) {
+public EmbeddedKafkaCluster(final int numBrokers, final Properties 
brokerConfig) {
 this(numBrokers, brokerConfig, Collections.emptyMap());
 }
 
 public EmbeddedKafkaCluster(final int numBrokers,
-final Properties brokerConfig,
-final Map clientConfigs) {
-brokers = new KafkaServer[numBrokers];
-currentBrokerPorts = new int[numBrokers];
-currentBrokerLogDirs = new String[numBrokers];
-this.brokerConfig = brokerConfig;
-// Since we support `stop` followed by `startOnlyKafkaOnSamePorts`, we 
track whether
-// a listener config is defined during initialization in order to know 
if it's
-// safe to override it
-hasListenerConfig = 
brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG) != null;
+   final Properties brokerConfig,
+   final Map clientConfigs) {
+addDefaultBrokerPropsIfAbsent(brokerConfig, numBrokers);
+try {
+KafkaClusterTestKit.Builder clusterBuilder = new 
KafkaClusterTestKit.Builder(
+new TestKitNodes.Builder()
+.setCombined(true)

Review Comment:
   Have you tried having a separate controller? My concern is that we have a 
method `stopOnlyBrokers()` which would stop controllers too in combined mode.



##
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##
@@ -748,9 +625,10 @@ public KafkaProducer 
createProducer(Map producer
 return producer;
 }
 
-private static void putIfAbsent(final Map props, final 
String propertyKey, final Object propertyValue) {
-if (!props.containsKey(propertyKey)) {
-props.put(propertyKey, propertyValue);
-}
+private void addDefaultBrokerPropsIfAbsent(Properties brokerConfig, int 
numBrokers) {
+brokerConfig.putIfAbsent(ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, 
"true");

Review Comment:
   Do we need that? This is the default value



##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java:
##
@@ -694,12 +701,29 @@ public void testTasksFailOnInabilityToFence() throws 
Exception {
 )).all().get();
 }
 
-StartAndStopLatch co

Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]

2024-07-17 Thread via GitHub


C0urante commented on PR #16599:
URL: https://github.com/apache/kafka/pull/16599#issuecomment-2233996052

   Found one more flaky test. It doesn't seem like that flakiness is inherent 
with KRaft mode, but it may be exacerbated by it. Pushed a fix.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]

2024-07-16 Thread via GitHub


C0urante commented on PR #16599:
URL: https://github.com/apache/kafka/pull/16599#issuecomment-2231872728

   Spoke too soon, but should be good to go now. The one current failing MM2 
test appears to be due to the bug that's fixed by 
https://github.com/apache/kafka/pull/16598, so it should be safe to ignore.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]

2024-07-15 Thread via GitHub


yashmayya closed pull request #13375: KAFKA-14569: Migrate Connect's 
integration test EmbeddedKafkaCluster from ZK to KRaft mode
URL: https://github.com/apache/kafka/pull/13375


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]

2024-07-15 Thread via GitHub


C0urante commented on PR #16599:
URL: https://github.com/apache/kafka/pull/16599#issuecomment-2229780003

   Okay, this should be good to go--verified the build locally and ran the SSL 
MM2 integration tests; hoping that's enough coverage for now, and that Jenkins 
will confirm the rest.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]

2024-07-15 Thread via GitHub


C0urante commented on PR #13375:
URL: https://github.com/apache/kafka/pull/13375#issuecomment-2229508987

   I've published https://github.com/apache/kafka/pull/16599, which is just a 
squash+rebase of this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]

2024-05-30 Thread via GitHub


yashmayya commented on PR #13375:
URL: https://github.com/apache/kafka/pull/13375#issuecomment-2140247839

   @mdedetrich apologies for the late response, I didn't get notified for your 
comment oddly enough. Please feel free to take over, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]

2024-04-19 Thread via GitHub


mdedetrich commented on PR #13375:
URL: https://github.com/apache/kafka/pull/13375#issuecomment-2066602056

   @yashmayya Are you still working on this to get it over the finish line or 
is it okay for me to take over?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org