Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]
C0urante merged PR #16599: URL: https://github.com/apache/kafka/pull/16599 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]
C0urante commented on PR #16599: URL: https://github.com/apache/kafka/pull/16599#issuecomment-2256133214 Thanks @chia7712! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]
C0urante commented on PR #16599: URL: https://github.com/apache/kafka/pull/16599#issuecomment-2253149294 Think I found the root cause. Pushed a fix, will try to get three CI runs without a failure on that test case before merging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]
C0urante commented on PR #16599: URL: https://github.com/apache/kafka/pull/16599#issuecomment-2253087786 I've noticed that the `IdentityReplicationIntegrationTest::testReplication` case has failed four times on this PR branch with the same cause: ``` org.opentest4j.AssertionFailedError: topic config was not synced ==> expected: but was: at app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplication(MirrorConnectorsIntegrationBaseTest.java:308) at java.base@21.0.3/java.lang.reflect.Method.invoke(Method.java:580) at java.base@21.0.3/java.util.ArrayList.forEach(ArrayList.java:1596) at java.base@21.0.3/java.util.ArrayList.forEach(ArrayList.java:1596) ``` AFAIK this exact failure hasn't been occurring on trunk. I'd like to investigate a little more before merging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]
mimaison commented on code in PR #16599: URL: https://github.com/apache/kafka/pull/16599#discussion_r1692881997 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java: ## @@ -199,149 +153,74 @@ private void doStart() { producer = new KafkaProducer<>(producerProps, new ByteArraySerializer(), new ByteArraySerializer()); } -public void stopOnlyKafka() { -stop(false, false); -} - -public void stop() { -stop(true, true); -} - -private void stop(boolean deleteLogDirs, boolean stopZK) { -maybeShutDownProducer(); -triggerBrokerShutdown(); -awaitBrokerShutdown(); - -if (deleteLogDirs) -deleteLogDirs(); - -if (stopZK) -stopZK(); -} - -private void maybeShutDownProducer() { -try { -if (producer != null) { -producer.close(); -} -} catch (Exception e) { -log.error("Could not shutdown producer ", e); -throw new RuntimeException("Could not shutdown producer", e); -} -} - -private void triggerBrokerShutdown() { -for (KafkaServer broker : brokers) { -try { -broker.shutdown(); -} catch (Throwable t) { -String msg = String.format("Could not shutdown broker at %s", address(broker)); -log.error(msg, t); -throw new RuntimeException(msg, t); -} -} -} - -private void awaitBrokerShutdown() { -for (KafkaServer broker : brokers) { -try { -broker.awaitShutdown(); -} catch (Throwable t) { -String msg = String.format("Failed while awaiting shutdown of broker at %s", address(broker)); -log.error(msg, t); -throw new RuntimeException(msg, t); -} -} -} - -private void deleteLogDirs() { -for (KafkaServer broker : brokers) { -try { -log.info("Cleaning up kafka log dirs at {}", broker.config().logDirs()); -CoreUtils.delete(broker.config().logDirs()); -} catch (Throwable t) { -String msg = String.format("Could not clean up log dirs for broker at %s", -address(broker)); -log.error(msg, t); -throw new RuntimeException(msg, t); -} -} -} - -private void stopZK() { -try { -zookeeper.shutdown(); -} catch (Throwable t) { -String msg = String.format("Could not shutdown zookeeper at %s", zKConnectString()); -log.error(msg, t); -throw new RuntimeException(msg, t); -} +/** + * Restarts the Kafka brokers. This can be called after {@link #stopTemporarily()}. Note that if the Kafka brokers + * need to be listening on the same ports as earlier, the {@link #brokerConfig} should contain the + * {@link SocketServerConfigs#LISTENERS_CONFIG} property and it should use a fixed non-zero free port. Also note that this is + * only possible when {@code numBrokers} is 1. + */ +public void restart() { +cluster.brokers().values().forEach(BrokerServer::startup); } -private static void putIfAbsent(final Properties props, final String propertyKey, final Object propertyValue) { -if (!props.containsKey(propertyKey)) { -props.put(propertyKey, propertyValue); -} +/** + * Stop the brokers (and controllers) in the Kafka cluster. This can be used to test Connect's functionality Review Comment: That will teach me to trust method names! Thanks for the explanation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]
C0urante commented on code in PR #16599: URL: https://github.com/apache/kafka/pull/16599#discussion_r1691585017 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java: ## @@ -95,98 +87,60 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; -import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG; -import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG; -import static org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG; -import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG; /** - * Setup an embedded Kafka cluster with specified number of brokers and specified broker properties. To be used for - * integration tests. + * Setup an embedded Kafka KRaft cluster (using {@link kafka.testkit.KafkaClusterTestKit} internally) with the + * specified number of brokers and the specified broker properties. This can be used for integration tests and is + * typically used in conjunction with {@link EmbeddedConnectCluster}. Additional Kafka client properties can also be + * supplied if required. This class also provides various utility methods to easily create Kafka topics, produce data, + * consume data etc. */ public class EmbeddedKafkaCluster { private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class); -private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120); +private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120); -// Kafka Config -private final KafkaServer[] brokers; +private final KafkaClusterTestKit cluster; private final Properties brokerConfig; -private final Time time = Time.SYSTEM; -private final int[] currentBrokerPorts; -private final String[] currentBrokerLogDirs; -private final boolean hasListenerConfig; +private final Map clientConfigs; -final Map clientConfigs; - -private EmbeddedZookeeper zookeeper = null; -private ListenerName listenerName = new ListenerName("PLAINTEXT"); private KafkaProducer producer; -public EmbeddedKafkaCluster(final int numBrokers, -final Properties brokerConfig) { +public EmbeddedKafkaCluster(final int numBrokers, final Properties brokerConfig) { this(numBrokers, brokerConfig, Collections.emptyMap()); } public EmbeddedKafkaCluster(final int numBrokers, -final Properties brokerConfig, -final Map clientConfigs) { -brokers = new KafkaServer[numBrokers]; -currentBrokerPorts = new int[numBrokers]; -currentBrokerLogDirs = new String[numBrokers]; -this.brokerConfig = brokerConfig; -// Since we support `stop` followed by `startOnlyKafkaOnSamePorts`, we track whether -// a listener config is defined during initialization in order to know if it's -// safe to override it -hasListenerConfig = brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG) != null; + final Properties brokerConfig, + final Map clientConfigs) { +addDefaultBrokerPropsIfAbsent(brokerConfig, numBrokers); +try { +KafkaClusterTestKit.Builder clusterBuilder = new KafkaClusterTestKit.Builder( +new TestKitNodes.Builder() +.setCombined(true) +.setNumBrokerNodes(numBrokers) +.setNumControllerNodes(numBrokers) Review Comment: Great call! I tested locally and fixing the number of controller nodes at 1 seemed to reduce the runtime of the `InternalTopicsIntegrationTest::testCreateInternalTopicsWithDefaultSettings` case by about 6-7 seconds on my laptop. There aren't many other tests that use multiple brokers but I've hardcoded the number of controllers to 1 in my latest commit just to play it safe. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]
chia7712 commented on code in PR #16599: URL: https://github.com/apache/kafka/pull/16599#discussion_r1691331640 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java: ## @@ -95,98 +87,60 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; -import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG; -import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG; -import static org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG; -import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG; /** - * Setup an embedded Kafka cluster with specified number of brokers and specified broker properties. To be used for - * integration tests. + * Setup an embedded Kafka KRaft cluster (using {@link kafka.testkit.KafkaClusterTestKit} internally) with the + * specified number of brokers and the specified broker properties. This can be used for integration tests and is + * typically used in conjunction with {@link EmbeddedConnectCluster}. Additional Kafka client properties can also be + * supplied if required. This class also provides various utility methods to easily create Kafka topics, produce data, + * consume data etc. */ public class EmbeddedKafkaCluster { private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class); -private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120); +private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120); -// Kafka Config -private final KafkaServer[] brokers; +private final KafkaClusterTestKit cluster; private final Properties brokerConfig; -private final Time time = Time.SYSTEM; -private final int[] currentBrokerPorts; -private final String[] currentBrokerLogDirs; -private final boolean hasListenerConfig; +private final Map clientConfigs; -final Map clientConfigs; - -private EmbeddedZookeeper zookeeper = null; -private ListenerName listenerName = new ListenerName("PLAINTEXT"); private KafkaProducer producer; -public EmbeddedKafkaCluster(final int numBrokers, -final Properties brokerConfig) { +public EmbeddedKafkaCluster(final int numBrokers, final Properties brokerConfig) { this(numBrokers, brokerConfig, Collections.emptyMap()); } public EmbeddedKafkaCluster(final int numBrokers, -final Properties brokerConfig, -final Map clientConfigs) { -brokers = new KafkaServer[numBrokers]; -currentBrokerPorts = new int[numBrokers]; -currentBrokerLogDirs = new String[numBrokers]; -this.brokerConfig = brokerConfig; -// Since we support `stop` followed by `startOnlyKafkaOnSamePorts`, we track whether -// a listener config is defined during initialization in order to know if it's -// safe to override it -hasListenerConfig = brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG) != null; + final Properties brokerConfig, + final Map clientConfigs) { +addDefaultBrokerPropsIfAbsent(brokerConfig, numBrokers); +try { +KafkaClusterTestKit.Builder clusterBuilder = new KafkaClusterTestKit.Builder( +new TestKitNodes.Builder() +.setCombined(true) +.setNumBrokerNodes(numBrokers) +.setNumControllerNodes(numBrokers) Review Comment: If the quorum controllers are black box to connect, maybe we can hard code the controllers to `3` (or `1` to reduce the resources in testing) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]
OmniaGM commented on code in PR #16599: URL: https://github.com/apache/kafka/pull/16599#discussion_r1691268684 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java: ## @@ -95,98 +87,60 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; -import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG; -import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG; -import static org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG; -import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG; /** - * Setup an embedded Kafka cluster with specified number of brokers and specified broker properties. To be used for - * integration tests. + * Setup an embedded Kafka KRaft cluster (using {@link kafka.testkit.KafkaClusterTestKit} internally) with the + * specified number of brokers and the specified broker properties. This can be used for integration tests and is + * typically used in conjunction with {@link EmbeddedConnectCluster}. Additional Kafka client properties can also be + * supplied if required. This class also provides various utility methods to easily create Kafka topics, produce data, + * consume data etc. */ public class EmbeddedKafkaCluster { private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class); -private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120); +private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120); -// Kafka Config -private final KafkaServer[] brokers; +private final KafkaClusterTestKit cluster; private final Properties brokerConfig; -private final Time time = Time.SYSTEM; -private final int[] currentBrokerPorts; -private final String[] currentBrokerLogDirs; -private final boolean hasListenerConfig; +private final Map clientConfigs; -final Map clientConfigs; - -private EmbeddedZookeeper zookeeper = null; -private ListenerName listenerName = new ListenerName("PLAINTEXT"); private KafkaProducer producer; -public EmbeddedKafkaCluster(final int numBrokers, -final Properties brokerConfig) { +public EmbeddedKafkaCluster(final int numBrokers, final Properties brokerConfig) { this(numBrokers, brokerConfig, Collections.emptyMap()); } public EmbeddedKafkaCluster(final int numBrokers, -final Properties brokerConfig, -final Map clientConfigs) { -brokers = new KafkaServer[numBrokers]; -currentBrokerPorts = new int[numBrokers]; -currentBrokerLogDirs = new String[numBrokers]; -this.brokerConfig = brokerConfig; -// Since we support `stop` followed by `startOnlyKafkaOnSamePorts`, we track whether -// a listener config is defined during initialization in order to know if it's -// safe to override it -hasListenerConfig = brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG) != null; + final Properties brokerConfig, + final Map clientConfigs) { +addDefaultBrokerPropsIfAbsent(brokerConfig, numBrokers); +try { +KafkaClusterTestKit.Builder clusterBuilder = new KafkaClusterTestKit.Builder( +new TestKitNodes.Builder() +.setCombined(true) +.setNumBrokerNodes(numBrokers) +.setNumControllerNodes(numBrokers) Review Comment: Do we care about num of controller == num of brokers can't be just hardcoded to 1. Bit concerned about cases where some one set unsuitable number for `numBrokers` not knowing this will impact quorum as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]
OmniaGM commented on code in PR #16599: URL: https://github.com/apache/kafka/pull/16599#discussion_r1691268684 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java: ## @@ -95,98 +87,60 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; -import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG; -import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG; -import static org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG; -import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG; /** - * Setup an embedded Kafka cluster with specified number of brokers and specified broker properties. To be used for - * integration tests. + * Setup an embedded Kafka KRaft cluster (using {@link kafka.testkit.KafkaClusterTestKit} internally) with the + * specified number of brokers and the specified broker properties. This can be used for integration tests and is + * typically used in conjunction with {@link EmbeddedConnectCluster}. Additional Kafka client properties can also be + * supplied if required. This class also provides various utility methods to easily create Kafka topics, produce data, + * consume data etc. */ public class EmbeddedKafkaCluster { private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class); -private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120); +private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120); -// Kafka Config -private final KafkaServer[] brokers; +private final KafkaClusterTestKit cluster; private final Properties brokerConfig; -private final Time time = Time.SYSTEM; -private final int[] currentBrokerPorts; -private final String[] currentBrokerLogDirs; -private final boolean hasListenerConfig; +private final Map clientConfigs; -final Map clientConfigs; - -private EmbeddedZookeeper zookeeper = null; -private ListenerName listenerName = new ListenerName("PLAINTEXT"); private KafkaProducer producer; -public EmbeddedKafkaCluster(final int numBrokers, -final Properties brokerConfig) { +public EmbeddedKafkaCluster(final int numBrokers, final Properties brokerConfig) { this(numBrokers, brokerConfig, Collections.emptyMap()); } public EmbeddedKafkaCluster(final int numBrokers, -final Properties brokerConfig, -final Map clientConfigs) { -brokers = new KafkaServer[numBrokers]; -currentBrokerPorts = new int[numBrokers]; -currentBrokerLogDirs = new String[numBrokers]; -this.brokerConfig = brokerConfig; -// Since we support `stop` followed by `startOnlyKafkaOnSamePorts`, we track whether -// a listener config is defined during initialization in order to know if it's -// safe to override it -hasListenerConfig = brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG) != null; + final Properties brokerConfig, + final Map clientConfigs) { +addDefaultBrokerPropsIfAbsent(brokerConfig, numBrokers); +try { +KafkaClusterTestKit.Builder clusterBuilder = new KafkaClusterTestKit.Builder( +new TestKitNodes.Builder() +.setCombined(true) +.setNumBrokerNodes(numBrokers) +.setNumControllerNodes(numBrokers) Review Comment: Do we care about num of controller == num of brokers can't be just hardcoded to 1. Bit concerned about cases where some one set unsuitable number not knowing this will impact quorum as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]
chia7712 commented on PR #16599: URL: https://github.com/apache/kafka/pull/16599#issuecomment-2250034366 @C0urante Could you please rebase code to include fix #16688 for one more CI? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]
chia7712 commented on PR #16599: URL: https://github.com/apache/kafka/pull/16599#issuecomment-2249611573 The failure will be fixed by #16688 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]
chia7712 commented on code in PR #16599: URL: https://github.com/apache/kafka/pull/16599#discussion_r1690292626 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java: ## @@ -447,9 +326,9 @@ public void createTopic(String topic, int partitions, int replication, Map topicConfig, Map adminClientConfig) { -if (replication > brokers.length) { +if (replication > cluster.brokers().size()) { Review Comment: > unless it would leave to a regression I'd prefer not to block this PR on figuring that out and save it for a follow-up. Is that alright? make senses to me :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]
C0urante commented on PR #16599: URL: https://github.com/apache/kafka/pull/16599#issuecomment-2248666797 Thanks @chia7712 for the review! Pushed a few commits that should hopefully address your comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]
C0urante commented on code in PR #16599: URL: https://github.com/apache/kafka/pull/16599#discussion_r1690265976 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java: ## @@ -199,149 +153,74 @@ private void doStart() { producer = new KafkaProducer<>(producerProps, new ByteArraySerializer(), new ByteArraySerializer()); } -public void stopOnlyKafka() { -stop(false, false); -} - -public void stop() { -stop(true, true); -} - -private void stop(boolean deleteLogDirs, boolean stopZK) { -maybeShutDownProducer(); -triggerBrokerShutdown(); -awaitBrokerShutdown(); - -if (deleteLogDirs) -deleteLogDirs(); - -if (stopZK) -stopZK(); -} - -private void maybeShutDownProducer() { -try { -if (producer != null) { -producer.close(); -} -} catch (Exception e) { -log.error("Could not shutdown producer ", e); -throw new RuntimeException("Could not shutdown producer", e); -} -} - -private void triggerBrokerShutdown() { -for (KafkaServer broker : brokers) { -try { -broker.shutdown(); -} catch (Throwable t) { -String msg = String.format("Could not shutdown broker at %s", address(broker)); -log.error(msg, t); -throw new RuntimeException(msg, t); -} -} -} - -private void awaitBrokerShutdown() { -for (KafkaServer broker : brokers) { -try { -broker.awaitShutdown(); -} catch (Throwable t) { -String msg = String.format("Failed while awaiting shutdown of broker at %s", address(broker)); -log.error(msg, t); -throw new RuntimeException(msg, t); -} -} -} - -private void deleteLogDirs() { -for (KafkaServer broker : brokers) { -try { -log.info("Cleaning up kafka log dirs at {}", broker.config().logDirs()); -CoreUtils.delete(broker.config().logDirs()); -} catch (Throwable t) { -String msg = String.format("Could not clean up log dirs for broker at %s", -address(broker)); -log.error(msg, t); -throw new RuntimeException(msg, t); -} -} -} - -private void stopZK() { -try { -zookeeper.shutdown(); -} catch (Throwable t) { -String msg = String.format("Could not shutdown zookeeper at %s", zKConnectString()); -log.error(msg, t); -throw new RuntimeException(msg, t); -} +/** + * Restarts the Kafka brokers. This can be called after {@link #stopTemporarily()}. Note that if the Kafka brokers + * need to be listening on the same ports as earlier, the {@link #brokerConfig} should contain the + * {@link SocketServerConfigs#LISTENERS_CONFIG} property and it should use a fixed non-zero free port. Also note that this is + * only possible when {@code numBrokers} is 1. + */ +public void restart() { +cluster.brokers().values().forEach(BrokerServer::startup); } -private static void putIfAbsent(final Properties props, final String propertyKey, final Object propertyValue) { -if (!props.containsKey(propertyKey)) { -props.put(propertyKey, propertyValue); -} +/** + * Stop the brokers (and controllers) in the Kafka cluster. This can be used to test Connect's functionality Review Comment: Ah, thanks for the clarification! I made this change in response to [a prior comment](https://github.com/apache/kafka/pull/16599#discussion_r1682557520) that had me thinking that running in combined mode meant it would be impossible to stop brokers separately from controllers. Given this new information I've reverted the commit I made in response to that comment. I think it should be enough (at least for the sake of this migration) to just run in combined mode. We could definitely add tests later on to verify controller connection. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]
C0urante commented on code in PR #16599: URL: https://github.com/apache/kafka/pull/16599#discussion_r1690265815 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java: ## @@ -199,149 +153,74 @@ private void doStart() { producer = new KafkaProducer<>(producerProps, new ByteArraySerializer(), new ByteArraySerializer()); } -public void stopOnlyKafka() { -stop(false, false); -} - -public void stop() { -stop(true, true); -} - -private void stop(boolean deleteLogDirs, boolean stopZK) { -maybeShutDownProducer(); -triggerBrokerShutdown(); -awaitBrokerShutdown(); - -if (deleteLogDirs) -deleteLogDirs(); - -if (stopZK) -stopZK(); -} - -private void maybeShutDownProducer() { -try { -if (producer != null) { -producer.close(); -} -} catch (Exception e) { -log.error("Could not shutdown producer ", e); -throw new RuntimeException("Could not shutdown producer", e); -} -} - -private void triggerBrokerShutdown() { -for (KafkaServer broker : brokers) { -try { -broker.shutdown(); -} catch (Throwable t) { -String msg = String.format("Could not shutdown broker at %s", address(broker)); -log.error(msg, t); -throw new RuntimeException(msg, t); -} -} -} - -private void awaitBrokerShutdown() { -for (KafkaServer broker : brokers) { -try { -broker.awaitShutdown(); -} catch (Throwable t) { -String msg = String.format("Failed while awaiting shutdown of broker at %s", address(broker)); -log.error(msg, t); -throw new RuntimeException(msg, t); -} -} -} - -private void deleteLogDirs() { -for (KafkaServer broker : brokers) { -try { -log.info("Cleaning up kafka log dirs at {}", broker.config().logDirs()); -CoreUtils.delete(broker.config().logDirs()); -} catch (Throwable t) { -String msg = String.format("Could not clean up log dirs for broker at %s", -address(broker)); -log.error(msg, t); -throw new RuntimeException(msg, t); -} -} -} - -private void stopZK() { -try { -zookeeper.shutdown(); -} catch (Throwable t) { -String msg = String.format("Could not shutdown zookeeper at %s", zKConnectString()); -log.error(msg, t); -throw new RuntimeException(msg, t); -} +/** + * Restarts the Kafka brokers. This can be called after {@link #stopTemporarily()}. Note that if the Kafka brokers + * need to be listening on the same ports as earlier, the {@link #brokerConfig} should contain the + * {@link SocketServerConfigs#LISTENERS_CONFIG} property and it should use a fixed non-zero free port. Also note that this is + * only possible when {@code numBrokers} is 1. + */ +public void restart() { +cluster.brokers().values().forEach(BrokerServer::startup); } -private static void putIfAbsent(final Properties props, final String propertyKey, final Object propertyValue) { -if (!props.containsKey(propertyKey)) { -props.put(propertyKey, propertyValue); -} +/** + * Stop the brokers (and controllers) in the Kafka cluster. This can be used to test Connect's functionality + * when the backing Kafka cluster goes offline. + */ +public void stopTemporarily() { +cluster.brokers().values().forEach(BrokerServer::shutdown); +cluster.brokers().values().forEach(BrokerServer::awaitShutdown); } -private String createLogDir() { -try { -return Files.createTempDirectory(getClass().getSimpleName()).toString(); -} catch (IOException e) { -log.error("Unable to create temporary log directory", e); -throw new ConnectException("Unable to create temporary log directory", e); +public void stop() { +AtomicReference shutdownFailure = new AtomicReference<>(); +Utils.closeQuietly(producer, "producer for embedded Kafka cluster", shutdownFailure); +Utils.closeQuietly(cluster, "embedded Kafka cluster", shutdownFailure); +if (shutdownFailure.get() != null) { +throw new ConnectException("Failed to shut down producer / embedded Kafka cluster", shutdownFailure.get()); } } public String bootstrapServers() { -return Arrays.stream(brokers) -.map(this::address) -.collect(Collectors.joining(",")); -} - -public String address(KafkaServer server) { -fina
Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]
chia7712 commented on code in PR #16599: URL: https://github.com/apache/kafka/pull/16599#discussion_r1689474552 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java: ## @@ -447,9 +326,9 @@ public void createTopic(String topic, int partitions, int replication, Map topicConfig, Map adminClientConfig) { -if (replication > brokers.length) { +if (replication > cluster.brokers().size()) { Review Comment: Is this a kind of fail fast? otherwise, the admin will get the same error from server. ## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java: ## @@ -199,149 +153,74 @@ private void doStart() { producer = new KafkaProducer<>(producerProps, new ByteArraySerializer(), new ByteArraySerializer()); } -public void stopOnlyKafka() { -stop(false, false); -} - -public void stop() { -stop(true, true); -} - -private void stop(boolean deleteLogDirs, boolean stopZK) { -maybeShutDownProducer(); -triggerBrokerShutdown(); -awaitBrokerShutdown(); - -if (deleteLogDirs) -deleteLogDirs(); - -if (stopZK) -stopZK(); -} - -private void maybeShutDownProducer() { -try { -if (producer != null) { -producer.close(); -} -} catch (Exception e) { -log.error("Could not shutdown producer ", e); -throw new RuntimeException("Could not shutdown producer", e); -} -} - -private void triggerBrokerShutdown() { -for (KafkaServer broker : brokers) { -try { -broker.shutdown(); -} catch (Throwable t) { -String msg = String.format("Could not shutdown broker at %s", address(broker)); -log.error(msg, t); -throw new RuntimeException(msg, t); -} -} -} - -private void awaitBrokerShutdown() { -for (KafkaServer broker : brokers) { -try { -broker.awaitShutdown(); -} catch (Throwable t) { -String msg = String.format("Failed while awaiting shutdown of broker at %s", address(broker)); -log.error(msg, t); -throw new RuntimeException(msg, t); -} -} -} - -private void deleteLogDirs() { -for (KafkaServer broker : brokers) { -try { -log.info("Cleaning up kafka log dirs at {}", broker.config().logDirs()); -CoreUtils.delete(broker.config().logDirs()); -} catch (Throwable t) { -String msg = String.format("Could not clean up log dirs for broker at %s", -address(broker)); -log.error(msg, t); -throw new RuntimeException(msg, t); -} -} -} - -private void stopZK() { -try { -zookeeper.shutdown(); -} catch (Throwable t) { -String msg = String.format("Could not shutdown zookeeper at %s", zKConnectString()); -log.error(msg, t); -throw new RuntimeException(msg, t); -} +/** + * Restarts the Kafka brokers. This can be called after {@link #stopTemporarily()}. Note that if the Kafka brokers + * need to be listening on the same ports as earlier, the {@link #brokerConfig} should contain the + * {@link SocketServerConfigs#LISTENERS_CONFIG} property and it should use a fixed non-zero free port. Also note that this is + * only possible when {@code numBrokers} is 1. + */ +public void restart() { +cluster.brokers().values().forEach(BrokerServer::startup); } -private static void putIfAbsent(final Properties props, final String propertyKey, final Object propertyValue) { -if (!props.containsKey(propertyKey)) { -props.put(propertyKey, propertyValue); -} +/** + * Stop the brokers (and controllers) in the Kafka cluster. This can be used to test Connect's functionality + * when the backing Kafka cluster goes offline. + */ +public void stopTemporarily() { +cluster.brokers().values().forEach(BrokerServer::shutdown); +cluster.brokers().values().forEach(BrokerServer::awaitShutdown); } -private String createLogDir() { -try { -return Files.createTempDirectory(getClass().getSimpleName()).toString(); -} catch (IOException e) { -log.error("Unable to create temporary log directory", e); -throw new ConnectException("Unable to create temporary log directory", e); +public void stop() { +AtomicReference shutdownFailure = new AtomicReference<>(); +Utils.closeQuietly(producer, "producer for embedded Kafka cluster", shutdownFailure); +Utils.closeQuietly(cluster, "embedde
Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]
chia7712 commented on code in PR #16599: URL: https://github.com/apache/kafka/pull/16599#discussion_r1689006610 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java: ## @@ -95,98 +88,60 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; -import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG; -import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG; -import static org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG; -import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG; /** - * Setup an embedded Kafka cluster with specified number of brokers and specified broker properties. To be used for - * integration tests. + * Setup an embedded Kafka KRaft cluster (using {@link kafka.testkit.KafkaClusterTestKit} internally) with the + * specified number of brokers and the specified broker properties. This can be used for integration tests and is + * typically used in conjunction with {@link EmbeddedConnectCluster}. Additional Kafka client properties can also be + * supplied if required. This class also provides various utility methods to easily create Kafka topics, produce data, + * consume data etc. */ public class EmbeddedKafkaCluster { private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class); -private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120); +private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120); -// Kafka Config -private final KafkaServer[] brokers; +private final KafkaClusterTestKit cluster; private final Properties brokerConfig; -private final Time time = Time.SYSTEM; -private final int[] currentBrokerPorts; -private final String[] currentBrokerLogDirs; -private final boolean hasListenerConfig; +private final Map clientConfigs; -final Map clientConfigs; - -private EmbeddedZookeeper zookeeper = null; -private ListenerName listenerName = new ListenerName("PLAINTEXT"); private KafkaProducer producer; -public EmbeddedKafkaCluster(final int numBrokers, -final Properties brokerConfig) { +public EmbeddedKafkaCluster(final int numBrokers, final Properties brokerConfig) { this(numBrokers, brokerConfig, Collections.emptyMap()); } public EmbeddedKafkaCluster(final int numBrokers, -final Properties brokerConfig, -final Map clientConfigs) { -brokers = new KafkaServer[numBrokers]; -currentBrokerPorts = new int[numBrokers]; -currentBrokerLogDirs = new String[numBrokers]; -this.brokerConfig = brokerConfig; -// Since we support `stop` followed by `startOnlyKafkaOnSamePorts`, we track whether -// a listener config is defined during initialization in order to know if it's -// safe to override it -hasListenerConfig = brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG) != null; + final Properties brokerConfig, + final Map clientConfigs) { +addDefaultBrokerPropsIfAbsent(brokerConfig, numBrokers); +try { +KafkaClusterTestKit.Builder clusterBuilder = new KafkaClusterTestKit.Builder( Review Comment: @C0urante Thanks for your response. I move the discussion to https://issues.apache.org/jira/browse/KAFKA-17174 to avoid unnecessary noise in this PR :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]
C0urante commented on PR #16599: URL: https://github.com/apache/kafka/pull/16599#issuecomment-2245782479 @mimaison It looks like my latest commits went through. Would you mind giving this another round when you have a chance? Happy to continue the conversation about the `ClusterTestExtensions` API too, but figure that that can take place concurrently with a review of the latest changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]
C0urante commented on code in PR #16599: URL: https://github.com/apache/kafka/pull/16599#discussion_r1688407540 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java: ## @@ -95,98 +88,60 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; -import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG; -import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG; -import static org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG; -import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG; /** - * Setup an embedded Kafka cluster with specified number of brokers and specified broker properties. To be used for - * integration tests. + * Setup an embedded Kafka KRaft cluster (using {@link kafka.testkit.KafkaClusterTestKit} internally) with the + * specified number of brokers and the specified broker properties. This can be used for integration tests and is + * typically used in conjunction with {@link EmbeddedConnectCluster}. Additional Kafka client properties can also be + * supplied if required. This class also provides various utility methods to easily create Kafka topics, produce data, + * consume data etc. */ public class EmbeddedKafkaCluster { private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class); -private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120); +private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120); -// Kafka Config -private final KafkaServer[] brokers; +private final KafkaClusterTestKit cluster; private final Properties brokerConfig; -private final Time time = Time.SYSTEM; -private final int[] currentBrokerPorts; -private final String[] currentBrokerLogDirs; -private final boolean hasListenerConfig; +private final Map clientConfigs; -final Map clientConfigs; - -private EmbeddedZookeeper zookeeper = null; -private ListenerName listenerName = new ListenerName("PLAINTEXT"); private KafkaProducer producer; -public EmbeddedKafkaCluster(final int numBrokers, -final Properties brokerConfig) { +public EmbeddedKafkaCluster(final int numBrokers, final Properties brokerConfig) { this(numBrokers, brokerConfig, Collections.emptyMap()); } public EmbeddedKafkaCluster(final int numBrokers, -final Properties brokerConfig, -final Map clientConfigs) { -brokers = new KafkaServer[numBrokers]; -currentBrokerPorts = new int[numBrokers]; -currentBrokerLogDirs = new String[numBrokers]; -this.brokerConfig = brokerConfig; -// Since we support `stop` followed by `startOnlyKafkaOnSamePorts`, we track whether -// a listener config is defined during initialization in order to know if it's -// safe to override it -hasListenerConfig = brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG) != null; + final Properties brokerConfig, + final Map clientConfigs) { +addDefaultBrokerPropsIfAbsent(brokerConfig, numBrokers); +try { +KafkaClusterTestKit.Builder clusterBuilder = new KafkaClusterTestKit.Builder( Review Comment: I agree that a unified testing style will be better in the long run, but we're pretty far away from that with Connect's integration tests. For the vast majority of our tests, we can't easily switch over to the `ClusterTestExtensions` API because our `EmbeddedConnectCluster` automatically spins up its own `EmbeddedKafkaCluster`. If we used the annotation-based API, we'd have to modify our `EmbeddedConnectCluster` class to not do that and to instead accept a `ClusterInstance` that's been created and possibly started out-of-band, which would actually be less convenient than our current embedded cluster API and IMO is not a path we should pursue. I also don't think that adopting the `ClusterTestExtensions` API piecemeal in Connect (like the proposal in KAFKA-17174 to migrate the `OffsetsApiIntegrationTest`) without a long-term strategy to migrate our entire collection of integration tests is a good idea. Right now Connect uses a consistent style for our all of our integration tests and fragmenting that style will just make life harder for the people actually writing and maintaining these tests. TL;DR: While I'm also not opposed to adopting the `ClusterTestExtensions` API in Conne
Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]
C0urante commented on code in PR #16599: URL: https://github.com/apache/kafka/pull/16599#discussion_r1688407540 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java: ## @@ -95,98 +88,60 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; -import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG; -import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG; -import static org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG; -import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG; /** - * Setup an embedded Kafka cluster with specified number of brokers and specified broker properties. To be used for - * integration tests. + * Setup an embedded Kafka KRaft cluster (using {@link kafka.testkit.KafkaClusterTestKit} internally) with the + * specified number of brokers and the specified broker properties. This can be used for integration tests and is + * typically used in conjunction with {@link EmbeddedConnectCluster}. Additional Kafka client properties can also be + * supplied if required. This class also provides various utility methods to easily create Kafka topics, produce data, + * consume data etc. */ public class EmbeddedKafkaCluster { private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class); -private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120); +private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120); -// Kafka Config -private final KafkaServer[] brokers; +private final KafkaClusterTestKit cluster; private final Properties brokerConfig; -private final Time time = Time.SYSTEM; -private final int[] currentBrokerPorts; -private final String[] currentBrokerLogDirs; -private final boolean hasListenerConfig; +private final Map clientConfigs; -final Map clientConfigs; - -private EmbeddedZookeeper zookeeper = null; -private ListenerName listenerName = new ListenerName("PLAINTEXT"); private KafkaProducer producer; -public EmbeddedKafkaCluster(final int numBrokers, -final Properties brokerConfig) { +public EmbeddedKafkaCluster(final int numBrokers, final Properties brokerConfig) { this(numBrokers, brokerConfig, Collections.emptyMap()); } public EmbeddedKafkaCluster(final int numBrokers, -final Properties brokerConfig, -final Map clientConfigs) { -brokers = new KafkaServer[numBrokers]; -currentBrokerPorts = new int[numBrokers]; -currentBrokerLogDirs = new String[numBrokers]; -this.brokerConfig = brokerConfig; -// Since we support `stop` followed by `startOnlyKafkaOnSamePorts`, we track whether -// a listener config is defined during initialization in order to know if it's -// safe to override it -hasListenerConfig = brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG) != null; + final Properties brokerConfig, + final Map clientConfigs) { +addDefaultBrokerPropsIfAbsent(brokerConfig, numBrokers); +try { +KafkaClusterTestKit.Builder clusterBuilder = new KafkaClusterTestKit.Builder( Review Comment: I agree that a unified testing style will be better in the long run, but we're pretty far away from that with Connect's integration tests. For the vast majority of our tests, we can't easily switch over to the `ClusterTestExtensions` API because our `EmbeddedConnectCluster` automatically spins up its own `EmbeddedKafkaCluster`. If we used the annotation-based API, we'd have to modify our `EmbeddedConnectCluster` class to not create its own embedded Kafka cluster and to instead accept a `ClusterInstance` that's been created and possibly started out-of-band, which would actually be less convenient than our current embedded cluster API and IMO is not a path we should pursue. I also don't think that adopting the `ClusterTestExtensions` API piecemeal in Connect (like the proposal in KAFKA-17174 to migrate the `OffsetsApiIntegrationTest`) without a long-term strategy to migrate our entire collection of integration tests is a good idea. Right now Connect uses a consistent style for our all of our integration tests and fragmenting that style will just make life harder for the people actually writing and maintaining these tests. TL;DR: While I'm also not opposed to adopting the `Clust
Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]
chia7712 commented on code in PR #16599: URL: https://github.com/apache/kafka/pull/16599#discussion_r1688198603 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java: ## @@ -95,98 +88,60 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; -import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG; -import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG; -import static org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG; -import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG; /** - * Setup an embedded Kafka cluster with specified number of brokers and specified broker properties. To be used for - * integration tests. + * Setup an embedded Kafka KRaft cluster (using {@link kafka.testkit.KafkaClusterTestKit} internally) with the + * specified number of brokers and the specified broker properties. This can be used for integration tests and is + * typically used in conjunction with {@link EmbeddedConnectCluster}. Additional Kafka client properties can also be + * supplied if required. This class also provides various utility methods to easily create Kafka topics, produce data, + * consume data etc. */ public class EmbeddedKafkaCluster { private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class); -private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120); +private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120); -// Kafka Config -private final KafkaServer[] brokers; +private final KafkaClusterTestKit cluster; private final Properties brokerConfig; -private final Time time = Time.SYSTEM; -private final int[] currentBrokerPorts; -private final String[] currentBrokerLogDirs; -private final boolean hasListenerConfig; +private final Map clientConfigs; -final Map clientConfigs; - -private EmbeddedZookeeper zookeeper = null; -private ListenerName listenerName = new ListenerName("PLAINTEXT"); private KafkaProducer producer; -public EmbeddedKafkaCluster(final int numBrokers, -final Properties brokerConfig) { +public EmbeddedKafkaCluster(final int numBrokers, final Properties brokerConfig) { this(numBrokers, brokerConfig, Collections.emptyMap()); } public EmbeddedKafkaCluster(final int numBrokers, -final Properties brokerConfig, -final Map clientConfigs) { -brokers = new KafkaServer[numBrokers]; -currentBrokerPorts = new int[numBrokers]; -currentBrokerLogDirs = new String[numBrokers]; -this.brokerConfig = brokerConfig; -// Since we support `stop` followed by `startOnlyKafkaOnSamePorts`, we track whether -// a listener config is defined during initialization in order to know if it's -// safe to override it -hasListenerConfig = brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG) != null; + final Properties brokerConfig, + final Map clientConfigs) { +addDefaultBrokerPropsIfAbsent(brokerConfig, numBrokers); +try { +KafkaClusterTestKit.Builder clusterBuilder = new KafkaClusterTestKit.Builder( Review Comment: I just want to apply the new test infra to all modules. That can unify the code base and all modules can derive the benefits when we improve it. BTW, I don't want to block this PR, but it seems to me some tests in connect module can leverage the new test infra first. We can work on them concurrently :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]
mimaison commented on code in PR #16599: URL: https://github.com/apache/kafka/pull/16599#discussion_r1687978245 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java: ## @@ -95,98 +88,60 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; -import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG; -import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG; -import static org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG; -import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG; /** - * Setup an embedded Kafka cluster with specified number of brokers and specified broker properties. To be used for - * integration tests. + * Setup an embedded Kafka KRaft cluster (using {@link kafka.testkit.KafkaClusterTestKit} internally) with the + * specified number of brokers and the specified broker properties. This can be used for integration tests and is + * typically used in conjunction with {@link EmbeddedConnectCluster}. Additional Kafka client properties can also be + * supplied if required. This class also provides various utility methods to easily create Kafka topics, produce data, + * consume data etc. */ public class EmbeddedKafkaCluster { private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class); -private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120); +private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120); -// Kafka Config -private final KafkaServer[] brokers; +private final KafkaClusterTestKit cluster; private final Properties brokerConfig; -private final Time time = Time.SYSTEM; -private final int[] currentBrokerPorts; -private final String[] currentBrokerLogDirs; -private final boolean hasListenerConfig; +private final Map clientConfigs; -final Map clientConfigs; - -private EmbeddedZookeeper zookeeper = null; -private ListenerName listenerName = new ListenerName("PLAINTEXT"); private KafkaProducer producer; -public EmbeddedKafkaCluster(final int numBrokers, -final Properties brokerConfig) { +public EmbeddedKafkaCluster(final int numBrokers, final Properties brokerConfig) { this(numBrokers, brokerConfig, Collections.emptyMap()); } public EmbeddedKafkaCluster(final int numBrokers, -final Properties brokerConfig, -final Map clientConfigs) { -brokers = new KafkaServer[numBrokers]; -currentBrokerPorts = new int[numBrokers]; -currentBrokerLogDirs = new String[numBrokers]; -this.brokerConfig = brokerConfig; -// Since we support `stop` followed by `startOnlyKafkaOnSamePorts`, we track whether -// a listener config is defined during initialization in order to know if it's -// safe to override it -hasListenerConfig = brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG) != null; + final Properties brokerConfig, + final Map clientConfigs) { +addDefaultBrokerPropsIfAbsent(brokerConfig, numBrokers); +try { +KafkaClusterTestKit.Builder clusterBuilder = new KafkaClusterTestKit.Builder( Review Comment: Thanks for the heads up! There are so many refactorings in progress, it's hard to keep up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]
chia7712 commented on code in PR #16599: URL: https://github.com/apache/kafka/pull/16599#discussion_r1687899344 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java: ## @@ -95,98 +88,60 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; -import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG; -import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG; -import static org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG; -import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG; /** - * Setup an embedded Kafka cluster with specified number of brokers and specified broker properties. To be used for - * integration tests. + * Setup an embedded Kafka KRaft cluster (using {@link kafka.testkit.KafkaClusterTestKit} internally) with the + * specified number of brokers and the specified broker properties. This can be used for integration tests and is + * typically used in conjunction with {@link EmbeddedConnectCluster}. Additional Kafka client properties can also be + * supplied if required. This class also provides various utility methods to easily create Kafka topics, produce data, + * consume data etc. */ public class EmbeddedKafkaCluster { private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class); -private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120); +private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120); -// Kafka Config -private final KafkaServer[] brokers; +private final KafkaClusterTestKit cluster; private final Properties brokerConfig; -private final Time time = Time.SYSTEM; -private final int[] currentBrokerPorts; -private final String[] currentBrokerLogDirs; -private final boolean hasListenerConfig; +private final Map clientConfigs; -final Map clientConfigs; - -private EmbeddedZookeeper zookeeper = null; -private ListenerName listenerName = new ListenerName("PLAINTEXT"); private KafkaProducer producer; -public EmbeddedKafkaCluster(final int numBrokers, -final Properties brokerConfig) { +public EmbeddedKafkaCluster(final int numBrokers, final Properties brokerConfig) { this(numBrokers, brokerConfig, Collections.emptyMap()); } public EmbeddedKafkaCluster(final int numBrokers, -final Properties brokerConfig, -final Map clientConfigs) { -brokers = new KafkaServer[numBrokers]; -currentBrokerPorts = new int[numBrokers]; -currentBrokerLogDirs = new String[numBrokers]; -this.brokerConfig = brokerConfig; -// Since we support `stop` followed by `startOnlyKafkaOnSamePorts`, we track whether -// a listener config is defined during initialization in order to know if it's -// safe to override it -hasListenerConfig = brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG) != null; + final Properties brokerConfig, + final Map clientConfigs) { +addDefaultBrokerPropsIfAbsent(brokerConfig, numBrokers); +try { +KafkaClusterTestKit.Builder clusterBuilder = new KafkaClusterTestKit.Builder( Review Comment: For another benefit: #16499 introduces thread-leak detection for method level, and it is applied automatically if we adopt `ClusterTestExtensions` :smile: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]
chia7712 commented on code in PR #16599: URL: https://github.com/apache/kafka/pull/16599#discussion_r1685882826 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java: ## @@ -95,98 +88,60 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; -import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG; -import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG; -import static org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG; -import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG; /** - * Setup an embedded Kafka cluster with specified number of brokers and specified broker properties. To be used for - * integration tests. + * Setup an embedded Kafka KRaft cluster (using {@link kafka.testkit.KafkaClusterTestKit} internally) with the + * specified number of brokers and the specified broker properties. This can be used for integration tests and is + * typically used in conjunction with {@link EmbeddedConnectCluster}. Additional Kafka client properties can also be + * supplied if required. This class also provides various utility methods to easily create Kafka topics, produce data, + * consume data etc. */ public class EmbeddedKafkaCluster { private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class); -private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120); +private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120); -// Kafka Config -private final KafkaServer[] brokers; +private final KafkaClusterTestKit cluster; private final Properties brokerConfig; -private final Time time = Time.SYSTEM; -private final int[] currentBrokerPorts; -private final String[] currentBrokerLogDirs; -private final boolean hasListenerConfig; +private final Map clientConfigs; -final Map clientConfigs; - -private EmbeddedZookeeper zookeeper = null; -private ListenerName listenerName = new ListenerName("PLAINTEXT"); private KafkaProducer producer; -public EmbeddedKafkaCluster(final int numBrokers, -final Properties brokerConfig) { +public EmbeddedKafkaCluster(final int numBrokers, final Properties brokerConfig) { this(numBrokers, brokerConfig, Collections.emptyMap()); } public EmbeddedKafkaCluster(final int numBrokers, -final Properties brokerConfig, -final Map clientConfigs) { -brokers = new KafkaServer[numBrokers]; -currentBrokerPorts = new int[numBrokers]; -currentBrokerLogDirs = new String[numBrokers]; -this.brokerConfig = brokerConfig; -// Since we support `stop` followed by `startOnlyKafkaOnSamePorts`, we track whether -// a listener config is defined during initialization in order to know if it's -// safe to override it -hasListenerConfig = brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG) != null; + final Properties brokerConfig, + final Map clientConfigs) { +addDefaultBrokerPropsIfAbsent(brokerConfig, numBrokers); +try { +KafkaClusterTestKit.Builder clusterBuilder = new KafkaClusterTestKit.Builder( Review Comment: I file https://issues.apache.org/jira/browse/KAFKA-17174 to address above comments. Please feel free to raise objection. thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]
chia7712 commented on code in PR #16599: URL: https://github.com/apache/kafka/pull/16599#discussion_r1685723572 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java: ## @@ -95,98 +88,60 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; -import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG; -import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG; -import static org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG; -import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG; /** - * Setup an embedded Kafka cluster with specified number of brokers and specified broker properties. To be used for - * integration tests. + * Setup an embedded Kafka KRaft cluster (using {@link kafka.testkit.KafkaClusterTestKit} internally) with the + * specified number of brokers and the specified broker properties. This can be used for integration tests and is + * typically used in conjunction with {@link EmbeddedConnectCluster}. Additional Kafka client properties can also be + * supplied if required. This class also provides various utility methods to easily create Kafka topics, produce data, + * consume data etc. */ public class EmbeddedKafkaCluster { private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class); -private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120); +private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120); -// Kafka Config -private final KafkaServer[] brokers; +private final KafkaClusterTestKit cluster; private final Properties brokerConfig; -private final Time time = Time.SYSTEM; -private final int[] currentBrokerPorts; -private final String[] currentBrokerLogDirs; -private final boolean hasListenerConfig; +private final Map clientConfigs; -final Map clientConfigs; - -private EmbeddedZookeeper zookeeper = null; -private ListenerName listenerName = new ListenerName("PLAINTEXT"); private KafkaProducer producer; -public EmbeddedKafkaCluster(final int numBrokers, -final Properties brokerConfig) { +public EmbeddedKafkaCluster(final int numBrokers, final Properties brokerConfig) { this(numBrokers, brokerConfig, Collections.emptyMap()); } public EmbeddedKafkaCluster(final int numBrokers, -final Properties brokerConfig, -final Map clientConfigs) { -brokers = new KafkaServer[numBrokers]; -currentBrokerPorts = new int[numBrokers]; -currentBrokerLogDirs = new String[numBrokers]; -this.brokerConfig = brokerConfig; -// Since we support `stop` followed by `startOnlyKafkaOnSamePorts`, we track whether -// a listener config is defined during initialization in order to know if it's -// safe to override it -hasListenerConfig = brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG) != null; + final Properties brokerConfig, + final Map clientConfigs) { +addDefaultBrokerPropsIfAbsent(brokerConfig, numBrokers); +try { +KafkaClusterTestKit.Builder clusterBuilder = new KafkaClusterTestKit.Builder( Review Comment: Or we can migrate the tests having few uses of `EmbeddedKafkaCluster` to `ClusterInstance`. For instance: `OffsetsApiIntegrationTest`. The other tests sticking on `EmbeddedKafkaCluster` can be migrated later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]
chia7712 commented on code in PR #16599: URL: https://github.com/apache/kafka/pull/16599#discussion_r1685709519 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java: ## @@ -95,98 +88,60 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; -import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG; -import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG; -import static org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG; -import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG; /** - * Setup an embedded Kafka cluster with specified number of brokers and specified broker properties. To be used for - * integration tests. + * Setup an embedded Kafka KRaft cluster (using {@link kafka.testkit.KafkaClusterTestKit} internally) with the + * specified number of brokers and the specified broker properties. This can be used for integration tests and is + * typically used in conjunction with {@link EmbeddedConnectCluster}. Additional Kafka client properties can also be + * supplied if required. This class also provides various utility methods to easily create Kafka topics, produce data, + * consume data etc. */ public class EmbeddedKafkaCluster { private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class); -private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120); +private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120); -// Kafka Config -private final KafkaServer[] brokers; +private final KafkaClusterTestKit cluster; private final Properties brokerConfig; -private final Time time = Time.SYSTEM; -private final int[] currentBrokerPorts; -private final String[] currentBrokerLogDirs; -private final boolean hasListenerConfig; +private final Map clientConfigs; -final Map clientConfigs; - -private EmbeddedZookeeper zookeeper = null; -private ListenerName listenerName = new ListenerName("PLAINTEXT"); private KafkaProducer producer; -public EmbeddedKafkaCluster(final int numBrokers, -final Properties brokerConfig) { +public EmbeddedKafkaCluster(final int numBrokers, final Properties brokerConfig) { this(numBrokers, brokerConfig, Collections.emptyMap()); } public EmbeddedKafkaCluster(final int numBrokers, -final Properties brokerConfig, -final Map clientConfigs) { -brokers = new KafkaServer[numBrokers]; -currentBrokerPorts = new int[numBrokers]; -currentBrokerLogDirs = new String[numBrokers]; -this.brokerConfig = brokerConfig; -// Since we support `stop` followed by `startOnlyKafkaOnSamePorts`, we track whether -// a listener config is defined during initialization in order to know if it's -// safe to override it -hasListenerConfig = brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG) != null; + final Properties brokerConfig, + final Map clientConfigs) { +addDefaultBrokerPropsIfAbsent(brokerConfig, numBrokers); +try { +KafkaClusterTestKit.Builder clusterBuilder = new KafkaClusterTestKit.Builder( Review Comment: Is it possible to leverage `ClusterInstance` instead of using `KafkaClusterTestKit` directly? We had introduced the new test infra to some storage tests (https://github.com/apache/kafka/blob/defcbb51ee98ae766e4fcf41ed7631ef1b0cdd15/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java#L50) Leveraging the `ClusterInstance` can have following benefits: 1. unify the test style 2. easy to test for all types (zk, kraft, co-kraft) 3. easy to migrate -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]
C0urante commented on PR #16599: URL: https://github.com/apache/kafka/pull/16599#issuecomment-2239959002 Well, I tried to push a new commit at least. Looks like airplane wifi is strong enough to publish GitHub comments but not enough to push commits. Will keep retrying until the commit lands 🥴 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]
C0urante commented on PR #16599: URL: https://github.com/apache/kafka/pull/16599#issuecomment-2239932531 Thanks Mickael! Pushed a new commit, ready for another round when you have a moment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]
C0urante commented on code in PR #16599: URL: https://github.com/apache/kafka/pull/16599#discussion_r1684858189 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java: ## @@ -694,12 +701,29 @@ public void testTasksFailOnInabilityToFence() throws Exception { )).all().get(); } -StartAndStopLatch connectorStart = connectorAndTaskStart(tasksMax); - log.info("Bringing up connector with fresh slate; fencing should not be necessary"); connect.configureConnector(CONNECTOR_NAME, props); -assertConnectorStarted(connectorStart); -// Verify that the connector and its tasks have been able to start successfully + +// Hack: There is a small chance that our recent ACL updates for the connector have +// not yet been propagated across the entire Kafka cluster, and that our connector +// will fail on startup when it tries to list the end offsets of the worker's offsets topic +// So, we implement some retry logic here to add a layer of resiliency in that case +waitForCondition( +() -> { +ConnectorStateInfo status = connect.connectorStatus(CONNECTOR_NAME); +if ("RUNNING".equals(status.connector().state())) { +return true; +} else if ("FAILED".equals(status.connector().state())) { +log.debug("Restarting failed connector {}", CONNECTOR_NAME); +connect.restartConnector(CONNECTOR_NAME); +} +return false; +}, +30_000, Review Comment: They're the same value, but we're retrying for slightly different reasons; delays in ACL propagation won't necessarily be caught during pre-flight connector config validation. I've pulled this out into a separate `ACL_PROPAGATION_TIMEOUT_MS` value; LMKWYT. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]
C0urante commented on code in PR #16599: URL: https://github.com/apache/kafka/pull/16599#discussion_r1684857716 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java: ## @@ -95,98 +88,60 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; -import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG; -import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG; -import static org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG; -import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG; /** - * Setup an embedded Kafka cluster with specified number of brokers and specified broker properties. To be used for - * integration tests. + * Setup an embedded Kafka KRaft cluster (using {@link kafka.testkit.KafkaClusterTestKit} internally) with the + * specified number of brokers and the specified broker properties. This can be used for integration tests and is + * typically used in conjunction with {@link EmbeddedConnectCluster}. Additional Kafka client properties can also be + * supplied if required. This class also provides various utility methods to easily create Kafka topics, produce data, + * consume data etc. */ public class EmbeddedKafkaCluster { private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class); -private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120); +private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120); -// Kafka Config -private final KafkaServer[] brokers; +private final KafkaClusterTestKit cluster; private final Properties brokerConfig; -private final Time time = Time.SYSTEM; -private final int[] currentBrokerPorts; -private final String[] currentBrokerLogDirs; -private final boolean hasListenerConfig; +private final Map clientConfigs; -final Map clientConfigs; - -private EmbeddedZookeeper zookeeper = null; -private ListenerName listenerName = new ListenerName("PLAINTEXT"); private KafkaProducer producer; -public EmbeddedKafkaCluster(final int numBrokers, -final Properties brokerConfig) { +public EmbeddedKafkaCluster(final int numBrokers, final Properties brokerConfig) { this(numBrokers, brokerConfig, Collections.emptyMap()); } public EmbeddedKafkaCluster(final int numBrokers, -final Properties brokerConfig, -final Map clientConfigs) { -brokers = new KafkaServer[numBrokers]; -currentBrokerPorts = new int[numBrokers]; -currentBrokerLogDirs = new String[numBrokers]; -this.brokerConfig = brokerConfig; -// Since we support `stop` followed by `startOnlyKafkaOnSamePorts`, we track whether -// a listener config is defined during initialization in order to know if it's -// safe to override it -hasListenerConfig = brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG) != null; + final Properties brokerConfig, + final Map clientConfigs) { +addDefaultBrokerPropsIfAbsent(brokerConfig, numBrokers); +try { +KafkaClusterTestKit.Builder clusterBuilder = new KafkaClusterTestKit.Builder( +new TestKitNodes.Builder() +.setCombined(true) Review Comment: Ah, interesting! I think it's possible to use a separate controller node/cluster but it's unnecessary for the purposes of our tests. IMO we should instead just rename the `stopOnlyBrokers` and `restartOnlyBrokers` methods to `stop` and `restart`, respectively. For the Connect integration tests, we largely treat Kafka like a black box that's usually on and, for some scenarios, may be turned off (or generally unavailable). FWICT we don't really do anything that requires the kind of granularity where we'd distinguish between brokers being unavailable and controllers being unavailable. It's also a little tricky to use separate controllers and brokers because we still have to be able to turn our embedded Kafka cluster off and back on again with the same port, which becomes difficult when you have to specify a fixed port to use across restarts for brokers only. You end up having to abuse the `setPerServerProperties` API to accomplish that, and while it's certainly possible, it is a bit ugly to expose that cleanly in our internal `EmbeddedKafkaCluster` API. Thoughts? ## connect/
Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]
mimaison commented on code in PR #16599: URL: https://github.com/apache/kafka/pull/16599#discussion_r1682557520 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java: ## @@ -95,98 +88,60 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; -import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG; -import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG; -import static org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG; -import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG; /** - * Setup an embedded Kafka cluster with specified number of brokers and specified broker properties. To be used for - * integration tests. + * Setup an embedded Kafka KRaft cluster (using {@link kafka.testkit.KafkaClusterTestKit} internally) with the + * specified number of brokers and the specified broker properties. This can be used for integration tests and is + * typically used in conjunction with {@link EmbeddedConnectCluster}. Additional Kafka client properties can also be + * supplied if required. This class also provides various utility methods to easily create Kafka topics, produce data, + * consume data etc. */ public class EmbeddedKafkaCluster { private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class); -private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120); +private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120); -// Kafka Config -private final KafkaServer[] brokers; +private final KafkaClusterTestKit cluster; private final Properties brokerConfig; -private final Time time = Time.SYSTEM; -private final int[] currentBrokerPorts; -private final String[] currentBrokerLogDirs; -private final boolean hasListenerConfig; +private final Map clientConfigs; -final Map clientConfigs; - -private EmbeddedZookeeper zookeeper = null; -private ListenerName listenerName = new ListenerName("PLAINTEXT"); private KafkaProducer producer; -public EmbeddedKafkaCluster(final int numBrokers, -final Properties brokerConfig) { +public EmbeddedKafkaCluster(final int numBrokers, final Properties brokerConfig) { this(numBrokers, brokerConfig, Collections.emptyMap()); } public EmbeddedKafkaCluster(final int numBrokers, -final Properties brokerConfig, -final Map clientConfigs) { -brokers = new KafkaServer[numBrokers]; -currentBrokerPorts = new int[numBrokers]; -currentBrokerLogDirs = new String[numBrokers]; -this.brokerConfig = brokerConfig; -// Since we support `stop` followed by `startOnlyKafkaOnSamePorts`, we track whether -// a listener config is defined during initialization in order to know if it's -// safe to override it -hasListenerConfig = brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG) != null; + final Properties brokerConfig, + final Map clientConfigs) { +addDefaultBrokerPropsIfAbsent(brokerConfig, numBrokers); +try { +KafkaClusterTestKit.Builder clusterBuilder = new KafkaClusterTestKit.Builder( +new TestKitNodes.Builder() +.setCombined(true) Review Comment: Have you tried having a separate controller? My concern is that we have a method `stopOnlyBrokers()` which would stop controllers too in combined mode. ## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java: ## @@ -748,9 +625,10 @@ public KafkaProducer createProducer(Map producer return producer; } -private static void putIfAbsent(final Map props, final String propertyKey, final Object propertyValue) { -if (!props.containsKey(propertyKey)) { -props.put(propertyKey, propertyValue); -} +private void addDefaultBrokerPropsIfAbsent(Properties brokerConfig, int numBrokers) { +brokerConfig.putIfAbsent(ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, "true"); Review Comment: Do we need that? This is the default value ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java: ## @@ -694,12 +701,29 @@ public void testTasksFailOnInabilityToFence() throws Exception { )).all().get(); } -StartAndStopLatch co
Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]
C0urante commented on PR #16599: URL: https://github.com/apache/kafka/pull/16599#issuecomment-2233996052 Found one more flaky test. It doesn't seem like that flakiness is inherent with KRaft mode, but it may be exacerbated by it. Pushed a fix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]
C0urante commented on PR #16599: URL: https://github.com/apache/kafka/pull/16599#issuecomment-2231872728 Spoke too soon, but should be good to go now. The one current failing MM2 test appears to be due to the bug that's fixed by https://github.com/apache/kafka/pull/16598, so it should be safe to ignore. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]
yashmayya closed pull request #13375: KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode URL: https://github.com/apache/kafka/pull/13375 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]
C0urante commented on PR #16599: URL: https://github.com/apache/kafka/pull/16599#issuecomment-2229780003 Okay, this should be good to go--verified the build locally and ran the SSL MM2 integration tests; hoping that's enough coverage for now, and that Jenkins will confirm the rest. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]
C0urante commented on PR #13375: URL: https://github.com/apache/kafka/pull/13375#issuecomment-2229508987 I've published https://github.com/apache/kafka/pull/16599, which is just a squash+rebase of this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]
yashmayya commented on PR #13375: URL: https://github.com/apache/kafka/pull/13375#issuecomment-2140247839 @mdedetrich apologies for the late response, I didn't get notified for your comment oddly enough. Please feel free to take over, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode [kafka]
mdedetrich commented on PR #13375: URL: https://github.com/apache/kafka/pull/13375#issuecomment-2066602056 @yashmayya Are you still working on this to get it over the finish line or is it okay for me to take over? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org