C0urante commented on code in PR #13375: URL: https://github.com/apache/kafka/pull/13375#discussion_r1145563634
########## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java: ########## @@ -187,120 +146,73 @@ private void doStart() { producer = new KafkaProducer<>(producerProps, new ByteArraySerializer(), new ByteArraySerializer()); } - public void stopOnlyKafka() { - stop(false, false); - } - - public void stop() { - stop(true, true); - } - - private void stop(boolean deleteLogDirs, boolean stopZK) { - try { - if (producer != null) { - producer.close(); - } - } catch (Exception e) { - log.error("Could not shutdown producer ", e); - throw new RuntimeException("Could not shutdown producer", e); - } - - for (KafkaServer broker : brokers) { - try { - broker.shutdown(); - } catch (Throwable t) { - String msg = String.format("Could not shutdown broker at %s", address(broker)); - log.error(msg, t); - throw new RuntimeException(msg, t); - } - } - - if (deleteLogDirs) { - for (KafkaServer broker : brokers) { - try { - log.info("Cleaning up kafka log dirs at {}", broker.config().logDirs()); - CoreUtils.delete(broker.config().logDirs()); - } catch (Throwable t) { - String msg = String.format("Could not clean up log dirs for broker at %s", - address(broker)); - log.error(msg, t); - throw new RuntimeException(msg, t); - } - } - } - - try { - if (stopZK) { - zookeeper.shutdown(); - } - } catch (Throwable t) { - String msg = String.format("Could not shutdown zookeeper at %s", zKConnectString()); - log.error(msg, t); - throw new RuntimeException(msg, t); + /** + * Restarts the Kafka brokers. This can be called after {@link #stopOnlyBrokers()}. Note that if the Kafka brokers + * need to be listening on the same ports as earlier, the {@link #brokerConfig} should contain the + * {@link KafkaConfig#ListenersProp} property and it should use a fixed non-zero free port. Review Comment: If we decide to keep this strategy, we should also note here that overriding the listeners property can only be done for single-node Kafka clusters. ########## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java: ########## @@ -187,120 +146,73 @@ private void doStart() { producer = new KafkaProducer<>(producerProps, new ByteArraySerializer(), new ByteArraySerializer()); } - public void stopOnlyKafka() { - stop(false, false); - } - - public void stop() { - stop(true, true); - } - - private void stop(boolean deleteLogDirs, boolean stopZK) { - try { - if (producer != null) { - producer.close(); - } - } catch (Exception e) { - log.error("Could not shutdown producer ", e); - throw new RuntimeException("Could not shutdown producer", e); - } - - for (KafkaServer broker : brokers) { - try { - broker.shutdown(); - } catch (Throwable t) { - String msg = String.format("Could not shutdown broker at %s", address(broker)); - log.error(msg, t); - throw new RuntimeException(msg, t); - } - } - - if (deleteLogDirs) { - for (KafkaServer broker : brokers) { - try { - log.info("Cleaning up kafka log dirs at {}", broker.config().logDirs()); - CoreUtils.delete(broker.config().logDirs()); - } catch (Throwable t) { - String msg = String.format("Could not clean up log dirs for broker at %s", - address(broker)); - log.error(msg, t); - throw new RuntimeException(msg, t); - } - } - } - - try { - if (stopZK) { - zookeeper.shutdown(); - } - } catch (Throwable t) { - String msg = String.format("Could not shutdown zookeeper at %s", zKConnectString()); - log.error(msg, t); - throw new RuntimeException(msg, t); + /** + * Restarts the Kafka brokers. This can be called after {@link #stopOnlyBrokers()}. Note that if the Kafka brokers + * need to be listening on the same ports as earlier, the {@link #brokerConfig} should contain the + * {@link KafkaConfig#ListenersProp} property and it should use a fixed non-zero free port. + */ + public void restartOnlyBrokers() { + for (BrokerServer broker : cluster.brokers().values()) { + broker.startup(); } } - private static void putIfAbsent(final Properties props, final String propertyKey, final Object propertyValue) { - if (!props.containsKey(propertyKey)) { - props.put(propertyKey, propertyValue); + /** + * Stop only the Kafka brokers (and not the KRaft controllers). This can be used to test Connect's functionality + * when the backing Kafka cluster goes offline. + */ + public void stopOnlyBrokers() { + for (BrokerServer broker : cluster.brokers().values()) { + broker.shutdown(); + broker.awaitShutdown(); } } - private String createLogDir() { + public void stop() { + if (producer != null) { + producer.close(); Review Comment: This skips the rest of the shutdown process if we fail to close the producer for some reason. Seems like a job for `Utils::closeQuietly`: ```java AtomicReference<Throwable> shutdownFailure = new AtomicReference<>(); Utils.closeQuietly(producer, "producer for embedded Kafka cluster", shutdownFailure); Utils.closeQuietly(cluster, "test Kafka cluster", shutdownFailure); if (shutdownFailure.get() != null) throw new ConnectException("Failed to shut down embedded Kafka cluster", shutdownFailure.get()); ``` ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java: ########## @@ -67,12 +68,21 @@ public class MirrorConnectorsWithCustomForwardingAdminIntegrationTest extends Mi * enable ACL on brokers. */ protected static void enableAclAuthorizer(Properties brokerProps) { - brokerProps.put("authorizer.class.name", "kafka.security.authorizer.AclAuthorizer"); - brokerProps.put("sasl.enabled.mechanisms", "PLAIN"); - brokerProps.put("sasl.mechanism.inter.broker.protocol", "PLAIN"); - brokerProps.put("security.inter.broker.protocol", "SASL_PLAINTEXT"); - brokerProps.put("listeners", "SASL_PLAINTEXT://localhost:0"); - brokerProps.put("listener.name.sasl_plaintext.plain.sasl.jaas.config", + brokerProps.put(KafkaConfig.AuthorizerClassNameProp(), "org.apache.kafka.metadata.authorizer.StandardAuthorizer"); + brokerProps.put(KafkaConfig.SaslEnabledMechanismsProp(), "PLAIN"); + brokerProps.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp(), "PLAIN"); + brokerProps.put(KafkaConfig.SaslMechanismControllerProtocolProp(), "PLAIN"); + brokerProps.put(KafkaConfig.ListenersProp(), "EXTERNAL://localhost:0,CONTROLLER://localhost:0"); + brokerProps.put(KafkaConfig.InterBrokerListenerNameProp(), "EXTERNAL"); + brokerProps.put(KafkaConfig.ControllerListenerNamesProp(), "CONTROLLER"); + brokerProps.put(KafkaConfig.ListenerSecurityProtocolMapProp(), "CONTROLLER:SASL_PLAINTEXT,EXTERNAL:SASL_PLAINTEXT"); + brokerProps.put("listener.name.external.plain.sasl.jaas.config", + "org.apache.kafka.common.security.plain.PlainLoginModule required " + + "username=\"super\" " + + "password=\"super_pwd\" " + + "user_connector=\"connector_pwd\" " + + "user_super=\"super_pwd\";"); Review Comment: Same thought RE pulling this string value out into a variable. ########## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java: ########## @@ -187,120 +146,73 @@ private void doStart() { producer = new KafkaProducer<>(producerProps, new ByteArraySerializer(), new ByteArraySerializer()); } - public void stopOnlyKafka() { - stop(false, false); - } - - public void stop() { - stop(true, true); - } - - private void stop(boolean deleteLogDirs, boolean stopZK) { - try { - if (producer != null) { - producer.close(); - } - } catch (Exception e) { - log.error("Could not shutdown producer ", e); - throw new RuntimeException("Could not shutdown producer", e); - } - - for (KafkaServer broker : brokers) { - try { - broker.shutdown(); - } catch (Throwable t) { - String msg = String.format("Could not shutdown broker at %s", address(broker)); - log.error(msg, t); - throw new RuntimeException(msg, t); - } - } - - if (deleteLogDirs) { - for (KafkaServer broker : brokers) { - try { - log.info("Cleaning up kafka log dirs at {}", broker.config().logDirs()); - CoreUtils.delete(broker.config().logDirs()); - } catch (Throwable t) { - String msg = String.format("Could not clean up log dirs for broker at %s", - address(broker)); - log.error(msg, t); - throw new RuntimeException(msg, t); - } - } - } - - try { - if (stopZK) { - zookeeper.shutdown(); - } - } catch (Throwable t) { - String msg = String.format("Could not shutdown zookeeper at %s", zKConnectString()); - log.error(msg, t); - throw new RuntimeException(msg, t); + /** + * Restarts the Kafka brokers. This can be called after {@link #stopOnlyBrokers()}. Note that if the Kafka brokers + * need to be listening on the same ports as earlier, the {@link #brokerConfig} should contain the + * {@link KafkaConfig#ListenersProp} property and it should use a fixed non-zero free port. + */ + public void restartOnlyBrokers() { + for (BrokerServer broker : cluster.brokers().values()) { + broker.startup(); Review Comment: Nit: can be a one-liner: ```java cluster.brokers().values().forEach(BrokerServer::startup); ``` ########## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java: ########## @@ -89,96 +82,62 @@ import static org.junit.Assert.assertFalse; /** - * Setup an embedded Kafka cluster with specified number of brokers and specified broker properties. To be used for - * integration tests. + * Setup an embedded Kafka KRaft cluster (using {@link kafka.testkit.KafkaClusterTestKit} internally) with the + * specified number of brokers and the specified broker properties. This can be used for integration tests and is + * typically used in conjunction with {@link EmbeddedConnectCluster}. Additional Kafka client properties can also be + * supplied if required. This class also provides various utility methods to easily create Kafka topics, produce data, + * consume data etc. */ public class EmbeddedKafkaCluster { private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class); - private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120); + private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120); - // Kafka Config - private final KafkaServer[] brokers; + private final KafkaClusterTestKit cluster; private final Properties brokerConfig; - private final Time time = new MockTime(); - private final int[] currentBrokerPorts; - private final String[] currentBrokerLogDirs; - private final boolean hasListenerConfig; - - final Map<String, String> clientConfigs; - - private EmbeddedZookeeper zookeeper = null; - private ListenerName listenerName = new ListenerName("PLAINTEXT"); + private final Map<String, String> clientConfigs; private KafkaProducer<byte[], byte[]> producer; - public EmbeddedKafkaCluster(final int numBrokers, - final Properties brokerConfig) { + public EmbeddedKafkaCluster(final int numBrokers, final Properties brokerConfig) { this(numBrokers, brokerConfig, Collections.emptyMap()); } public EmbeddedKafkaCluster(final int numBrokers, - final Properties brokerConfig, - final Map<String, String> clientConfigs) { - brokers = new KafkaServer[numBrokers]; - currentBrokerPorts = new int[numBrokers]; - currentBrokerLogDirs = new String[numBrokers]; + final Properties brokerConfig, + final Map<String, String> clientConfigs) { + addDefaultBrokerPropsIfAbsent(brokerConfig, numBrokers); + try { + KafkaClusterTestKit.Builder clusterBuilder = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder() + .setCoResident(true) + .setNumBrokerNodes(numBrokers) + .setNumControllerNodes(numBrokers) + .build() + ); + + brokerConfig.forEach((k, v) -> clusterBuilder.setConfigProp((String) k, v)); + cluster = clusterBuilder.build(); + cluster.nonFatalFaultHandler().setIgnore(true); + } catch (Exception e) { + throw new ConnectException("Failed to create test Kafka cluster", e); + } this.brokerConfig = brokerConfig; - // Since we support `stop` followed by `startOnlyKafkaOnSamePorts`, we track whether - // a listener config is defined during initialization in order to know if it's - // safe to override it - hasListenerConfig = brokerConfig.get(KafkaConfig.ListenersProp()) != null; - this.clientConfigs = clientConfigs; } - /** - * Starts the Kafka cluster alone using the ports that were assigned during initialization of - * the harness. - * - * @throws ConnectException if a directory to store the data cannot be created - */ - public void startOnlyKafkaOnSamePorts() { - doStart(); - } - public void start() { - // pick a random port - zookeeper = new EmbeddedZookeeper(); - Arrays.fill(currentBrokerPorts, 0); - Arrays.fill(currentBrokerLogDirs, null); - doStart(); - } - - private void doStart() { - brokerConfig.put(KafkaConfig.ZkConnectProp(), zKConnectString()); - - putIfAbsent(brokerConfig, KafkaConfig.DeleteTopicEnableProp(), true); - putIfAbsent(brokerConfig, KafkaConfig.GroupInitialRebalanceDelayMsProp(), 0); - putIfAbsent(brokerConfig, KafkaConfig.OffsetsTopicReplicationFactorProp(), (short) brokers.length); - putIfAbsent(brokerConfig, KafkaConfig.AutoCreateTopicsEnableProp(), false); - // reduce the size of the log cleaner map to reduce test memory usage - putIfAbsent(brokerConfig, KafkaConfig.LogCleanerDedupeBufferSizeProp(), 2 * 1024 * 1024L); - - Object listenerConfig = brokerConfig.get(KafkaConfig.InterBrokerListenerNameProp()); - if (listenerConfig == null) - listenerConfig = brokerConfig.get(KafkaConfig.InterBrokerSecurityProtocolProp()); - if (listenerConfig == null) - listenerConfig = "PLAINTEXT"; - listenerName = new ListenerName(listenerConfig.toString()); - - for (int i = 0; i < brokers.length; i++) { - brokerConfig.put(KafkaConfig.BrokerIdProp(), i); - currentBrokerLogDirs[i] = currentBrokerLogDirs[i] == null ? createLogDir() : currentBrokerLogDirs[i]; - brokerConfig.put(KafkaConfig.LogDirProp(), currentBrokerLogDirs[i]); - if (!hasListenerConfig) - brokerConfig.put(KafkaConfig.ListenersProp(), listenerName.value() + "://localhost:" + currentBrokerPorts[i]); - brokers[i] = TestUtils.createServer(new KafkaConfig(brokerConfig, true), time); - currentBrokerPorts[i] = brokers[i].boundPort(listenerName); + try { + cluster.format(); + cluster.startup(); + cluster.waitForReadyBrokers(); + } catch (Exception e) { + throw new ConnectException("Failed to start test Kafka cluster", e); } Map<String, Object> producerProps = new HashMap<>(clientConfigs); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()); + Review Comment: ? ########## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java: ########## @@ -187,120 +146,73 @@ private void doStart() { producer = new KafkaProducer<>(producerProps, new ByteArraySerializer(), new ByteArraySerializer()); } - public void stopOnlyKafka() { - stop(false, false); - } - - public void stop() { - stop(true, true); - } - - private void stop(boolean deleteLogDirs, boolean stopZK) { - try { - if (producer != null) { - producer.close(); - } - } catch (Exception e) { - log.error("Could not shutdown producer ", e); - throw new RuntimeException("Could not shutdown producer", e); - } - - for (KafkaServer broker : brokers) { - try { - broker.shutdown(); - } catch (Throwable t) { - String msg = String.format("Could not shutdown broker at %s", address(broker)); - log.error(msg, t); - throw new RuntimeException(msg, t); - } - } - - if (deleteLogDirs) { - for (KafkaServer broker : brokers) { - try { - log.info("Cleaning up kafka log dirs at {}", broker.config().logDirs()); - CoreUtils.delete(broker.config().logDirs()); - } catch (Throwable t) { - String msg = String.format("Could not clean up log dirs for broker at %s", - address(broker)); - log.error(msg, t); - throw new RuntimeException(msg, t); - } - } - } - - try { - if (stopZK) { - zookeeper.shutdown(); - } - } catch (Throwable t) { - String msg = String.format("Could not shutdown zookeeper at %s", zKConnectString()); - log.error(msg, t); - throw new RuntimeException(msg, t); + /** + * Restarts the Kafka brokers. This can be called after {@link #stopOnlyBrokers()}. Note that if the Kafka brokers + * need to be listening on the same ports as earlier, the {@link #brokerConfig} should contain the + * {@link KafkaConfig#ListenersProp} property and it should use a fixed non-zero free port. + */ + public void restartOnlyBrokers() { + for (BrokerServer broker : cluster.brokers().values()) { + broker.startup(); } } - private static void putIfAbsent(final Properties props, final String propertyKey, final Object propertyValue) { - if (!props.containsKey(propertyKey)) { - props.put(propertyKey, propertyValue); + /** + * Stop only the Kafka brokers (and not the KRaft controllers). This can be used to test Connect's functionality + * when the backing Kafka cluster goes offline. + */ + public void stopOnlyBrokers() { + for (BrokerServer broker : cluster.brokers().values()) { + broker.shutdown(); + broker.awaitShutdown(); } } - private String createLogDir() { + public void stop() { + if (producer != null) { + producer.close(); + } try { - return Files.createTempDirectory(getClass().getSimpleName()).toString(); - } catch (IOException e) { - log.error("Unable to create temporary log directory", e); - throw new ConnectException("Unable to create temporary log directory", e); + cluster.close(); + } catch (Exception e) { + throw new ConnectException("Failed to shutdown test Kafka cluster", e); } } public String bootstrapServers() { - return Arrays.stream(brokers) - .map(this::address) - .collect(Collectors.joining(",")); - } - - public String address(KafkaServer server) { - final EndPoint endPoint = server.advertisedListeners().head(); - return endPoint.host() + ":" + endPoint.port(); - } - - public String zKConnectString() { - return "127.0.0.1:" + zookeeper.port(); + return cluster.bootstrapServers(); } /** * Get the brokers that have a {@link BrokerState#RUNNING} state. * - * @return the list of {@link KafkaServer} instances that are running; - * never null but possibly empty + * @return the list of {@link BrokerServer} instances that are running; + * never null but possibly empty */ - public Set<KafkaServer> runningBrokers() { + public Set<BrokerServer> runningBrokers() { return brokersInState(state -> state == BrokerState.RUNNING); } /** * Get the brokers whose state match the given predicate. * - * @return the list of {@link KafkaServer} instances with states that match the predicate; - * never null but possibly empty + * @return the list of {@link BrokerServer} instances with states that match the predicate; + * never null but possibly empty */ - public Set<KafkaServer> brokersInState(Predicate<BrokerState> desiredState) { - return Arrays.stream(brokers) - .filter(b -> hasState(b, desiredState)) - .collect(Collectors.toSet()); + public Set<BrokerServer> brokersInState(Predicate<BrokerState> desiredState) { + return cluster.brokers().values().stream() + .filter(b -> hasState(b, desiredState)) + .collect(Collectors.toSet()); } - protected boolean hasState(KafkaServer server, Predicate<BrokerState> desiredState) { + protected boolean hasState(BrokerServer server, Predicate<BrokerState> desiredState) { try { return desiredState.test(server.brokerState()); } catch (Throwable e) { // Broker failed to respond. return false; } } - Review Comment: ? ########## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java: ########## @@ -187,120 +146,73 @@ private void doStart() { producer = new KafkaProducer<>(producerProps, new ByteArraySerializer(), new ByteArraySerializer()); } - public void stopOnlyKafka() { - stop(false, false); - } - - public void stop() { - stop(true, true); - } - - private void stop(boolean deleteLogDirs, boolean stopZK) { - try { - if (producer != null) { - producer.close(); - } - } catch (Exception e) { - log.error("Could not shutdown producer ", e); - throw new RuntimeException("Could not shutdown producer", e); - } - - for (KafkaServer broker : brokers) { - try { - broker.shutdown(); - } catch (Throwable t) { - String msg = String.format("Could not shutdown broker at %s", address(broker)); - log.error(msg, t); - throw new RuntimeException(msg, t); - } - } - - if (deleteLogDirs) { - for (KafkaServer broker : brokers) { - try { - log.info("Cleaning up kafka log dirs at {}", broker.config().logDirs()); - CoreUtils.delete(broker.config().logDirs()); - } catch (Throwable t) { - String msg = String.format("Could not clean up log dirs for broker at %s", - address(broker)); - log.error(msg, t); - throw new RuntimeException(msg, t); - } - } - } - - try { - if (stopZK) { - zookeeper.shutdown(); - } - } catch (Throwable t) { - String msg = String.format("Could not shutdown zookeeper at %s", zKConnectString()); - log.error(msg, t); - throw new RuntimeException(msg, t); + /** + * Restarts the Kafka brokers. This can be called after {@link #stopOnlyBrokers()}. Note that if the Kafka brokers + * need to be listening on the same ports as earlier, the {@link #brokerConfig} should contain the + * {@link KafkaConfig#ListenersProp} property and it should use a fixed non-zero free port. + */ + public void restartOnlyBrokers() { + for (BrokerServer broker : cluster.brokers().values()) { + broker.startup(); } } - private static void putIfAbsent(final Properties props, final String propertyKey, final Object propertyValue) { - if (!props.containsKey(propertyKey)) { - props.put(propertyKey, propertyValue); + /** + * Stop only the Kafka brokers (and not the KRaft controllers). This can be used to test Connect's functionality + * when the backing Kafka cluster goes offline. + */ + public void stopOnlyBrokers() { + for (BrokerServer broker : cluster.brokers().values()) { + broker.shutdown(); + broker.awaitShutdown(); Review Comment: Nit: can be a couple one-liners, which may also be more efficient (we trigger shutdown in every broker and then await shutdown for every broker, instead of triggering and awaiting each broker one-by-one): ```java cluster.brokers().values().forEach(BrokerServer::shutdown); cluster.brokers().values().forEach(BrokerServer::awaitShutdown); ``` ########## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java: ########## @@ -187,120 +146,73 @@ private void doStart() { producer = new KafkaProducer<>(producerProps, new ByteArraySerializer(), new ByteArraySerializer()); } - public void stopOnlyKafka() { - stop(false, false); - } - - public void stop() { - stop(true, true); - } - - private void stop(boolean deleteLogDirs, boolean stopZK) { - try { - if (producer != null) { - producer.close(); - } - } catch (Exception e) { - log.error("Could not shutdown producer ", e); - throw new RuntimeException("Could not shutdown producer", e); - } - - for (KafkaServer broker : brokers) { - try { - broker.shutdown(); - } catch (Throwable t) { - String msg = String.format("Could not shutdown broker at %s", address(broker)); - log.error(msg, t); - throw new RuntimeException(msg, t); - } - } - - if (deleteLogDirs) { - for (KafkaServer broker : brokers) { - try { - log.info("Cleaning up kafka log dirs at {}", broker.config().logDirs()); - CoreUtils.delete(broker.config().logDirs()); - } catch (Throwable t) { - String msg = String.format("Could not clean up log dirs for broker at %s", - address(broker)); - log.error(msg, t); - throw new RuntimeException(msg, t); - } - } - } - - try { - if (stopZK) { - zookeeper.shutdown(); - } - } catch (Throwable t) { - String msg = String.format("Could not shutdown zookeeper at %s", zKConnectString()); - log.error(msg, t); - throw new RuntimeException(msg, t); + /** + * Restarts the Kafka brokers. This can be called after {@link #stopOnlyBrokers()}. Note that if the Kafka brokers + * need to be listening on the same ports as earlier, the {@link #brokerConfig} should contain the + * {@link KafkaConfig#ListenersProp} property and it should use a fixed non-zero free port. + */ + public void restartOnlyBrokers() { + for (BrokerServer broker : cluster.brokers().values()) { + broker.startup(); } } - private static void putIfAbsent(final Properties props, final String propertyKey, final Object propertyValue) { - if (!props.containsKey(propertyKey)) { - props.put(propertyKey, propertyValue); + /** + * Stop only the Kafka brokers (and not the KRaft controllers). This can be used to test Connect's functionality + * when the backing Kafka cluster goes offline. + */ + public void stopOnlyBrokers() { + for (BrokerServer broker : cluster.brokers().values()) { + broker.shutdown(); + broker.awaitShutdown(); } } - private String createLogDir() { + public void stop() { + if (producer != null) { + producer.close(); + } try { - return Files.createTempDirectory(getClass().getSimpleName()).toString(); - } catch (IOException e) { - log.error("Unable to create temporary log directory", e); - throw new ConnectException("Unable to create temporary log directory", e); + cluster.close(); + } catch (Exception e) { + throw new ConnectException("Failed to shutdown test Kafka cluster", e); } } public String bootstrapServers() { - return Arrays.stream(brokers) - .map(this::address) - .collect(Collectors.joining(",")); - } - - public String address(KafkaServer server) { - final EndPoint endPoint = server.advertisedListeners().head(); - return endPoint.host() + ":" + endPoint.port(); - } - - public String zKConnectString() { - return "127.0.0.1:" + zookeeper.port(); + return cluster.bootstrapServers(); Review Comment: So much cleaner! 🎉 ########## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java: ########## @@ -89,96 +82,62 @@ import static org.junit.Assert.assertFalse; /** - * Setup an embedded Kafka cluster with specified number of brokers and specified broker properties. To be used for - * integration tests. + * Setup an embedded Kafka KRaft cluster (using {@link kafka.testkit.KafkaClusterTestKit} internally) with the + * specified number of brokers and the specified broker properties. This can be used for integration tests and is + * typically used in conjunction with {@link EmbeddedConnectCluster}. Additional Kafka client properties can also be + * supplied if required. This class also provides various utility methods to easily create Kafka topics, produce data, + * consume data etc. */ public class EmbeddedKafkaCluster { private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class); - private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120); + private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120); - // Kafka Config - private final KafkaServer[] brokers; + private final KafkaClusterTestKit cluster; private final Properties brokerConfig; - private final Time time = new MockTime(); - private final int[] currentBrokerPorts; - private final String[] currentBrokerLogDirs; - private final boolean hasListenerConfig; - - final Map<String, String> clientConfigs; - - private EmbeddedZookeeper zookeeper = null; - private ListenerName listenerName = new ListenerName("PLAINTEXT"); + private final Map<String, String> clientConfigs; private KafkaProducer<byte[], byte[]> producer; - public EmbeddedKafkaCluster(final int numBrokers, - final Properties brokerConfig) { + public EmbeddedKafkaCluster(final int numBrokers, final Properties brokerConfig) { this(numBrokers, brokerConfig, Collections.emptyMap()); } public EmbeddedKafkaCluster(final int numBrokers, - final Properties brokerConfig, - final Map<String, String> clientConfigs) { - brokers = new KafkaServer[numBrokers]; - currentBrokerPorts = new int[numBrokers]; - currentBrokerLogDirs = new String[numBrokers]; + final Properties brokerConfig, + final Map<String, String> clientConfigs) { + addDefaultBrokerPropsIfAbsent(brokerConfig, numBrokers); + try { + KafkaClusterTestKit.Builder clusterBuilder = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder() + .setCoResident(true) + .setNumBrokerNodes(numBrokers) + .setNumControllerNodes(numBrokers) + .build() + ); + + brokerConfig.forEach((k, v) -> clusterBuilder.setConfigProp((String) k, v)); + cluster = clusterBuilder.build(); + cluster.nonFatalFaultHandler().setIgnore(true); + } catch (Exception e) { + throw new ConnectException("Failed to create test Kafka cluster", e); + } this.brokerConfig = brokerConfig; - // Since we support `stop` followed by `startOnlyKafkaOnSamePorts`, we track whether - // a listener config is defined during initialization in order to know if it's - // safe to override it - hasListenerConfig = brokerConfig.get(KafkaConfig.ListenersProp()) != null; - this.clientConfigs = clientConfigs; } - /** - * Starts the Kafka cluster alone using the ports that were assigned during initialization of - * the harness. - * - * @throws ConnectException if a directory to store the data cannot be created - */ - public void startOnlyKafkaOnSamePorts() { - doStart(); - } - public void start() { - // pick a random port - zookeeper = new EmbeddedZookeeper(); - Arrays.fill(currentBrokerPorts, 0); - Arrays.fill(currentBrokerLogDirs, null); - doStart(); - } - - private void doStart() { - brokerConfig.put(KafkaConfig.ZkConnectProp(), zKConnectString()); - - putIfAbsent(brokerConfig, KafkaConfig.DeleteTopicEnableProp(), true); - putIfAbsent(brokerConfig, KafkaConfig.GroupInitialRebalanceDelayMsProp(), 0); - putIfAbsent(brokerConfig, KafkaConfig.OffsetsTopicReplicationFactorProp(), (short) brokers.length); - putIfAbsent(brokerConfig, KafkaConfig.AutoCreateTopicsEnableProp(), false); - // reduce the size of the log cleaner map to reduce test memory usage - putIfAbsent(brokerConfig, KafkaConfig.LogCleanerDedupeBufferSizeProp(), 2 * 1024 * 1024L); Review Comment: Don't we want to keep this part? ########## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java: ########## @@ -656,6 +567,19 @@ public KafkaProducer<byte[], byte[]> createProducer(Map<String, Object> producer return producer; } + private void addDefaultBrokerPropsIfAbsent(Properties brokerConfig, int numBrokers) { + putIfAbsent(brokerConfig, KafkaConfig.DeleteTopicEnableProp(), "true"); + putIfAbsent(brokerConfig, KafkaConfig.GroupInitialRebalanceDelayMsProp(), "0"); + putIfAbsent(brokerConfig, KafkaConfig.OffsetsTopicReplicationFactorProp(), String.valueOf(numBrokers)); + putIfAbsent(brokerConfig, KafkaConfig.AutoCreateTopicsEnableProp(), "false"); + } + + private static void putIfAbsent(final Properties props, final String propertyKey, final Object propertyValue) { + if (!props.containsKey(propertyKey)) { + props.put(propertyKey, propertyValue); + } + } Review Comment: Can't we just use `Properties::putIfAbsent`? IIRC the only reason we have the variant below for `Map<String, Object>` is because it was implemented when we still supported Java versions before 1.8. ```suggestion private void addDefaultBrokerPropsIfAbsent(Properties brokerConfig, int numBrokers) { brokerConfig.putIfAbsent(KafkaConfig.DeleteTopicEnableProp(), "true"); brokerConfig.putIfAbsent(KafkaConfig.GroupInitialRebalanceDelayMsProp(), "0"); brokerConfig.putIfAbsent(KafkaConfig.OffsetsTopicReplicationFactorProp(), String.valueOf(numBrokers)); brokerConfig.putIfAbsent(KafkaConfig.AutoCreateTopicsEnableProp(), "false"); } ``` ########## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java: ########## @@ -621,12 +622,21 @@ public void testConnectorReconfiguration() throws Exception { */ @Test public void testTasksFailOnInabilityToFence() throws Exception { - brokerProps.put("authorizer.class.name", "kafka.security.authorizer.AclAuthorizer"); - brokerProps.put("sasl.enabled.mechanisms", "PLAIN"); - brokerProps.put("sasl.mechanism.inter.broker.protocol", "PLAIN"); - brokerProps.put("security.inter.broker.protocol", "SASL_PLAINTEXT"); - brokerProps.put("listeners", "SASL_PLAINTEXT://localhost:0"); - brokerProps.put("listener.name.sasl_plaintext.plain.sasl.jaas.config", + brokerProps.put(KafkaConfig.AuthorizerClassNameProp(), "org.apache.kafka.metadata.authorizer.StandardAuthorizer"); + brokerProps.put(KafkaConfig.SaslEnabledMechanismsProp(), "PLAIN"); + brokerProps.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp(), "PLAIN"); + brokerProps.put(KafkaConfig.SaslMechanismControllerProtocolProp(), "PLAIN"); + brokerProps.put(KafkaConfig.ListenersProp(), "EXTERNAL://localhost:0,CONTROLLER://localhost:0"); + brokerProps.put(KafkaConfig.InterBrokerListenerNameProp(), "EXTERNAL"); + brokerProps.put(KafkaConfig.ControllerListenerNamesProp(), "CONTROLLER"); + brokerProps.put(KafkaConfig.ListenerSecurityProtocolMapProp(), "CONTROLLER:SASL_PLAINTEXT,EXTERNAL:SASL_PLAINTEXT"); + brokerProps.put("listener.name.external.plain.sasl.jaas.config", + "org.apache.kafka.common.security.plain.PlainLoginModule required " + + "username=\"super\" " + + "password=\"super_pwd\" " + + "user_connector=\"connector_pwd\" " + + "user_super=\"super_pwd\";"); Review Comment: Can we pull this out into a variable and use it here and with the `istener.name.controller.plain.sasl.jaas.config` property? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org