C0urante commented on code in PR #13375:
URL: https://github.com/apache/kafka/pull/13375#discussion_r1145563634


##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##########
@@ -187,120 +146,73 @@ private void doStart() {
         producer = new KafkaProducer<>(producerProps, new 
ByteArraySerializer(), new ByteArraySerializer());
     }
 
-    public void stopOnlyKafka() {
-        stop(false, false);
-    }
-
-    public void stop() {
-        stop(true, true);
-    }
-
-    private void stop(boolean deleteLogDirs, boolean stopZK) {
-        try {
-            if (producer != null) {
-                producer.close();
-            }
-        } catch (Exception e) {
-            log.error("Could not shutdown producer ", e);
-            throw new RuntimeException("Could not shutdown producer", e);
-        }
-
-        for (KafkaServer broker : brokers) {
-            try {
-                broker.shutdown();
-            } catch (Throwable t) {
-                String msg = String.format("Could not shutdown broker at %s", 
address(broker));
-                log.error(msg, t);
-                throw new RuntimeException(msg, t);
-            }
-        }
-
-        if (deleteLogDirs) {
-            for (KafkaServer broker : brokers) {
-                try {
-                    log.info("Cleaning up kafka log dirs at {}", 
broker.config().logDirs());
-                    CoreUtils.delete(broker.config().logDirs());
-                } catch (Throwable t) {
-                    String msg = String.format("Could not clean up log dirs 
for broker at %s",
-                            address(broker));
-                    log.error(msg, t);
-                    throw new RuntimeException(msg, t);
-                }
-            }
-        }
-
-        try {
-            if (stopZK) {
-                zookeeper.shutdown();
-            }
-        } catch (Throwable t) {
-            String msg = String.format("Could not shutdown zookeeper at %s", 
zKConnectString());
-            log.error(msg, t);
-            throw new RuntimeException(msg, t);
+    /**
+     * Restarts the Kafka brokers. This can be called after {@link 
#stopOnlyBrokers()}. Note that if the Kafka brokers
+     * need to be listening on the same ports as earlier, the {@link 
#brokerConfig} should contain the
+     * {@link KafkaConfig#ListenersProp} property and it should use a fixed 
non-zero free port.

Review Comment:
   If we decide to keep this strategy, we should also note here that overriding 
the listeners property can only be done for single-node Kafka clusters.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##########
@@ -187,120 +146,73 @@ private void doStart() {
         producer = new KafkaProducer<>(producerProps, new 
ByteArraySerializer(), new ByteArraySerializer());
     }
 
-    public void stopOnlyKafka() {
-        stop(false, false);
-    }
-
-    public void stop() {
-        stop(true, true);
-    }
-
-    private void stop(boolean deleteLogDirs, boolean stopZK) {
-        try {
-            if (producer != null) {
-                producer.close();
-            }
-        } catch (Exception e) {
-            log.error("Could not shutdown producer ", e);
-            throw new RuntimeException("Could not shutdown producer", e);
-        }
-
-        for (KafkaServer broker : brokers) {
-            try {
-                broker.shutdown();
-            } catch (Throwable t) {
-                String msg = String.format("Could not shutdown broker at %s", 
address(broker));
-                log.error(msg, t);
-                throw new RuntimeException(msg, t);
-            }
-        }
-
-        if (deleteLogDirs) {
-            for (KafkaServer broker : brokers) {
-                try {
-                    log.info("Cleaning up kafka log dirs at {}", 
broker.config().logDirs());
-                    CoreUtils.delete(broker.config().logDirs());
-                } catch (Throwable t) {
-                    String msg = String.format("Could not clean up log dirs 
for broker at %s",
-                            address(broker));
-                    log.error(msg, t);
-                    throw new RuntimeException(msg, t);
-                }
-            }
-        }
-
-        try {
-            if (stopZK) {
-                zookeeper.shutdown();
-            }
-        } catch (Throwable t) {
-            String msg = String.format("Could not shutdown zookeeper at %s", 
zKConnectString());
-            log.error(msg, t);
-            throw new RuntimeException(msg, t);
+    /**
+     * Restarts the Kafka brokers. This can be called after {@link 
#stopOnlyBrokers()}. Note that if the Kafka brokers
+     * need to be listening on the same ports as earlier, the {@link 
#brokerConfig} should contain the
+     * {@link KafkaConfig#ListenersProp} property and it should use a fixed 
non-zero free port.
+     */
+    public void restartOnlyBrokers() {
+        for (BrokerServer broker : cluster.brokers().values()) {
+            broker.startup();
         }
     }
 
-    private static void putIfAbsent(final Properties props, final String 
propertyKey, final Object propertyValue) {
-        if (!props.containsKey(propertyKey)) {
-            props.put(propertyKey, propertyValue);
+    /**
+     * Stop only the Kafka brokers (and not the KRaft controllers). This can 
be used to test Connect's functionality
+     * when the backing Kafka cluster goes offline.
+     */
+    public void stopOnlyBrokers() {
+        for (BrokerServer broker : cluster.brokers().values()) {
+            broker.shutdown();
+            broker.awaitShutdown();
         }
     }
 
-    private String createLogDir() {
+    public void stop() {
+        if (producer != null) {
+            producer.close();

Review Comment:
   This skips the rest of the shutdown process if we fail to close the producer 
for some reason.
   
   Seems like a job for `Utils::closeQuietly`:
   
   ```java
   AtomicReference<Throwable> shutdownFailure = new AtomicReference<>();
   Utils.closeQuietly(producer, "producer for embedded Kafka cluster", 
shutdownFailure);
   Utils.closeQuietly(cluster, "test Kafka cluster", shutdownFailure);
   if (shutdownFailure.get() != null)
       throw new ConnectException("Failed to shut down embedded Kafka cluster", 
shutdownFailure.get());
   ```



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java:
##########
@@ -67,12 +68,21 @@ public class 
MirrorConnectorsWithCustomForwardingAdminIntegrationTest extends Mi
      * enable ACL on brokers.
      */
     protected static void enableAclAuthorizer(Properties brokerProps) {
-        brokerProps.put("authorizer.class.name", 
"kafka.security.authorizer.AclAuthorizer");
-        brokerProps.put("sasl.enabled.mechanisms", "PLAIN");
-        brokerProps.put("sasl.mechanism.inter.broker.protocol", "PLAIN");
-        brokerProps.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
-        brokerProps.put("listeners", "SASL_PLAINTEXT://localhost:0");
-        brokerProps.put("listener.name.sasl_plaintext.plain.sasl.jaas.config",
+        brokerProps.put(KafkaConfig.AuthorizerClassNameProp(), 
"org.apache.kafka.metadata.authorizer.StandardAuthorizer");
+        brokerProps.put(KafkaConfig.SaslEnabledMechanismsProp(), "PLAIN");
+        brokerProps.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp(), 
"PLAIN");
+        brokerProps.put(KafkaConfig.SaslMechanismControllerProtocolProp(), 
"PLAIN");
+        brokerProps.put(KafkaConfig.ListenersProp(), 
"EXTERNAL://localhost:0,CONTROLLER://localhost:0");
+        brokerProps.put(KafkaConfig.InterBrokerListenerNameProp(), "EXTERNAL");
+        brokerProps.put(KafkaConfig.ControllerListenerNamesProp(), 
"CONTROLLER");
+        brokerProps.put(KafkaConfig.ListenerSecurityProtocolMapProp(), 
"CONTROLLER:SASL_PLAINTEXT,EXTERNAL:SASL_PLAINTEXT");
+        brokerProps.put("listener.name.external.plain.sasl.jaas.config",
+                "org.apache.kafka.common.security.plain.PlainLoginModule 
required "
+                        + "username=\"super\" "
+                        + "password=\"super_pwd\" "
+                        + "user_connector=\"connector_pwd\" "
+                        + "user_super=\"super_pwd\";");

Review Comment:
   Same thought RE pulling this string value out into a variable.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##########
@@ -187,120 +146,73 @@ private void doStart() {
         producer = new KafkaProducer<>(producerProps, new 
ByteArraySerializer(), new ByteArraySerializer());
     }
 
-    public void stopOnlyKafka() {
-        stop(false, false);
-    }
-
-    public void stop() {
-        stop(true, true);
-    }
-
-    private void stop(boolean deleteLogDirs, boolean stopZK) {
-        try {
-            if (producer != null) {
-                producer.close();
-            }
-        } catch (Exception e) {
-            log.error("Could not shutdown producer ", e);
-            throw new RuntimeException("Could not shutdown producer", e);
-        }
-
-        for (KafkaServer broker : brokers) {
-            try {
-                broker.shutdown();
-            } catch (Throwable t) {
-                String msg = String.format("Could not shutdown broker at %s", 
address(broker));
-                log.error(msg, t);
-                throw new RuntimeException(msg, t);
-            }
-        }
-
-        if (deleteLogDirs) {
-            for (KafkaServer broker : brokers) {
-                try {
-                    log.info("Cleaning up kafka log dirs at {}", 
broker.config().logDirs());
-                    CoreUtils.delete(broker.config().logDirs());
-                } catch (Throwable t) {
-                    String msg = String.format("Could not clean up log dirs 
for broker at %s",
-                            address(broker));
-                    log.error(msg, t);
-                    throw new RuntimeException(msg, t);
-                }
-            }
-        }
-
-        try {
-            if (stopZK) {
-                zookeeper.shutdown();
-            }
-        } catch (Throwable t) {
-            String msg = String.format("Could not shutdown zookeeper at %s", 
zKConnectString());
-            log.error(msg, t);
-            throw new RuntimeException(msg, t);
+    /**
+     * Restarts the Kafka brokers. This can be called after {@link 
#stopOnlyBrokers()}. Note that if the Kafka brokers
+     * need to be listening on the same ports as earlier, the {@link 
#brokerConfig} should contain the
+     * {@link KafkaConfig#ListenersProp} property and it should use a fixed 
non-zero free port.
+     */
+    public void restartOnlyBrokers() {
+        for (BrokerServer broker : cluster.brokers().values()) {
+            broker.startup();

Review Comment:
   Nit: can be a one-liner:
   ```java
   cluster.brokers().values().forEach(BrokerServer::startup);
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##########
@@ -89,96 +82,62 @@
 import static org.junit.Assert.assertFalse;
 
 /**
- * Setup an embedded Kafka cluster with specified number of brokers and 
specified broker properties. To be used for
- * integration tests.
+ * Setup an embedded Kafka KRaft cluster (using {@link 
kafka.testkit.KafkaClusterTestKit} internally) with the
+ * specified number of brokers and the specified broker properties. This can 
be used for integration tests and is
+ * typically used in conjunction with {@link EmbeddedConnectCluster}. 
Additional Kafka client properties can also be
+ * supplied if required. This class also provides various utility methods to 
easily create Kafka topics, produce data,
+ * consume data etc.
  */
 public class EmbeddedKafkaCluster {
 
     private static final Logger log = 
LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
 
-    private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120); 
+    private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120);
 
-    // Kafka Config
-    private final KafkaServer[] brokers;
+    private final KafkaClusterTestKit cluster;
     private final Properties brokerConfig;
-    private final Time time = new MockTime();
-    private final int[] currentBrokerPorts;
-    private final String[] currentBrokerLogDirs;
-    private final boolean hasListenerConfig;
-
-    final Map<String, String> clientConfigs;
-
-    private EmbeddedZookeeper zookeeper = null;
-    private ListenerName listenerName = new ListenerName("PLAINTEXT");
+    private final Map<String, String> clientConfigs;
     private KafkaProducer<byte[], byte[]> producer;
 
-    public EmbeddedKafkaCluster(final int numBrokers,
-                                final Properties brokerConfig) {
+    public EmbeddedKafkaCluster(final int numBrokers, final Properties 
brokerConfig) {
         this(numBrokers, brokerConfig, Collections.emptyMap());
     }
 
     public EmbeddedKafkaCluster(final int numBrokers,
-                                final Properties brokerConfig,
-                                final Map<String, String> clientConfigs) {
-        brokers = new KafkaServer[numBrokers];
-        currentBrokerPorts = new int[numBrokers];
-        currentBrokerLogDirs = new String[numBrokers];
+                                   final Properties brokerConfig,
+                                   final Map<String, String> clientConfigs) {
+        addDefaultBrokerPropsIfAbsent(brokerConfig, numBrokers);
+        try {
+            KafkaClusterTestKit.Builder clusterBuilder = new 
KafkaClusterTestKit.Builder(
+                    new TestKitNodes.Builder()
+                            .setCoResident(true)
+                            .setNumBrokerNodes(numBrokers)
+                            .setNumControllerNodes(numBrokers)
+                            .build()
+            );
+
+            brokerConfig.forEach((k, v) -> 
clusterBuilder.setConfigProp((String) k, v));
+            cluster = clusterBuilder.build();
+            cluster.nonFatalFaultHandler().setIgnore(true);
+        } catch (Exception e) {
+            throw new ConnectException("Failed to create test Kafka cluster", 
e);
+        }
         this.brokerConfig = brokerConfig;
-        // Since we support `stop` followed by `startOnlyKafkaOnSamePorts`, we 
track whether
-        // a listener config is defined during initialization in order to know 
if it's
-        // safe to override it
-        hasListenerConfig = brokerConfig.get(KafkaConfig.ListenersProp()) != 
null;
-
         this.clientConfigs = clientConfigs;
     }
 
-    /**
-     * Starts the Kafka cluster alone using the ports that were assigned 
during initialization of
-     * the harness.
-     *
-     * @throws ConnectException if a directory to store the data cannot be 
created
-     */
-    public void startOnlyKafkaOnSamePorts() {
-        doStart();
-    }
-
     public void start() {
-        // pick a random port
-        zookeeper = new EmbeddedZookeeper();
-        Arrays.fill(currentBrokerPorts, 0);
-        Arrays.fill(currentBrokerLogDirs, null);
-        doStart();
-    }
-
-    private void doStart() {
-        brokerConfig.put(KafkaConfig.ZkConnectProp(), zKConnectString());
-
-        putIfAbsent(brokerConfig, KafkaConfig.DeleteTopicEnableProp(), true);
-        putIfAbsent(brokerConfig, 
KafkaConfig.GroupInitialRebalanceDelayMsProp(), 0);
-        putIfAbsent(brokerConfig, 
KafkaConfig.OffsetsTopicReplicationFactorProp(), (short) brokers.length);
-        putIfAbsent(brokerConfig, KafkaConfig.AutoCreateTopicsEnableProp(), 
false);
-        // reduce the size of the log cleaner map to reduce test memory usage
-        putIfAbsent(brokerConfig, 
KafkaConfig.LogCleanerDedupeBufferSizeProp(), 2 * 1024 * 1024L);
-
-        Object listenerConfig = 
brokerConfig.get(KafkaConfig.InterBrokerListenerNameProp());
-        if (listenerConfig == null)
-            listenerConfig = 
brokerConfig.get(KafkaConfig.InterBrokerSecurityProtocolProp());
-        if (listenerConfig == null)
-            listenerConfig = "PLAINTEXT";
-        listenerName = new ListenerName(listenerConfig.toString());
-
-        for (int i = 0; i < brokers.length; i++) {
-            brokerConfig.put(KafkaConfig.BrokerIdProp(), i);
-            currentBrokerLogDirs[i] = currentBrokerLogDirs[i] == null ? 
createLogDir() : currentBrokerLogDirs[i];
-            brokerConfig.put(KafkaConfig.LogDirProp(), 
currentBrokerLogDirs[i]);
-            if (!hasListenerConfig)
-                brokerConfig.put(KafkaConfig.ListenersProp(), 
listenerName.value() + "://localhost:" + currentBrokerPorts[i]);
-            brokers[i] = TestUtils.createServer(new KafkaConfig(brokerConfig, 
true), time);
-            currentBrokerPorts[i] = brokers[i].boundPort(listenerName);
+        try {
+            cluster.format();
+            cluster.startup();
+            cluster.waitForReadyBrokers();
+        } catch (Exception e) {
+            throw new ConnectException("Failed to start test Kafka cluster", 
e);
         }
 
         Map<String, Object> producerProps = new HashMap<>(clientConfigs);
         producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers());
+

Review Comment:
   ?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##########
@@ -187,120 +146,73 @@ private void doStart() {
         producer = new KafkaProducer<>(producerProps, new 
ByteArraySerializer(), new ByteArraySerializer());
     }
 
-    public void stopOnlyKafka() {
-        stop(false, false);
-    }
-
-    public void stop() {
-        stop(true, true);
-    }
-
-    private void stop(boolean deleteLogDirs, boolean stopZK) {
-        try {
-            if (producer != null) {
-                producer.close();
-            }
-        } catch (Exception e) {
-            log.error("Could not shutdown producer ", e);
-            throw new RuntimeException("Could not shutdown producer", e);
-        }
-
-        for (KafkaServer broker : brokers) {
-            try {
-                broker.shutdown();
-            } catch (Throwable t) {
-                String msg = String.format("Could not shutdown broker at %s", 
address(broker));
-                log.error(msg, t);
-                throw new RuntimeException(msg, t);
-            }
-        }
-
-        if (deleteLogDirs) {
-            for (KafkaServer broker : brokers) {
-                try {
-                    log.info("Cleaning up kafka log dirs at {}", 
broker.config().logDirs());
-                    CoreUtils.delete(broker.config().logDirs());
-                } catch (Throwable t) {
-                    String msg = String.format("Could not clean up log dirs 
for broker at %s",
-                            address(broker));
-                    log.error(msg, t);
-                    throw new RuntimeException(msg, t);
-                }
-            }
-        }
-
-        try {
-            if (stopZK) {
-                zookeeper.shutdown();
-            }
-        } catch (Throwable t) {
-            String msg = String.format("Could not shutdown zookeeper at %s", 
zKConnectString());
-            log.error(msg, t);
-            throw new RuntimeException(msg, t);
+    /**
+     * Restarts the Kafka brokers. This can be called after {@link 
#stopOnlyBrokers()}. Note that if the Kafka brokers
+     * need to be listening on the same ports as earlier, the {@link 
#brokerConfig} should contain the
+     * {@link KafkaConfig#ListenersProp} property and it should use a fixed 
non-zero free port.
+     */
+    public void restartOnlyBrokers() {
+        for (BrokerServer broker : cluster.brokers().values()) {
+            broker.startup();
         }
     }
 
-    private static void putIfAbsent(final Properties props, final String 
propertyKey, final Object propertyValue) {
-        if (!props.containsKey(propertyKey)) {
-            props.put(propertyKey, propertyValue);
+    /**
+     * Stop only the Kafka brokers (and not the KRaft controllers). This can 
be used to test Connect's functionality
+     * when the backing Kafka cluster goes offline.
+     */
+    public void stopOnlyBrokers() {
+        for (BrokerServer broker : cluster.brokers().values()) {
+            broker.shutdown();
+            broker.awaitShutdown();
         }
     }
 
-    private String createLogDir() {
+    public void stop() {
+        if (producer != null) {
+            producer.close();
+        }
         try {
-            return 
Files.createTempDirectory(getClass().getSimpleName()).toString();
-        } catch (IOException e) {
-            log.error("Unable to create temporary log directory", e);
-            throw new ConnectException("Unable to create temporary log 
directory", e);
+            cluster.close();
+        } catch (Exception e) {
+            throw new ConnectException("Failed to shutdown test Kafka 
cluster", e);
         }
     }
 
     public String bootstrapServers() {
-        return Arrays.stream(brokers)
-                .map(this::address)
-                .collect(Collectors.joining(","));
-    }
-
-    public String address(KafkaServer server) {
-        final EndPoint endPoint = server.advertisedListeners().head();
-        return endPoint.host() + ":" + endPoint.port();
-    }
-
-    public String zKConnectString() {
-        return "127.0.0.1:" + zookeeper.port();
+        return cluster.bootstrapServers();
     }
 
     /**
      * Get the brokers that have a {@link BrokerState#RUNNING} state.
      *
-     * @return the list of {@link KafkaServer} instances that are running;
-     *         never null but  possibly empty
+     * @return the list of {@link BrokerServer} instances that are running;
+     *         never null but possibly empty
      */
-    public Set<KafkaServer> runningBrokers() {
+    public Set<BrokerServer> runningBrokers() {
         return brokersInState(state -> state == BrokerState.RUNNING);
     }
 
     /**
      * Get the brokers whose state match the given predicate.
      *
-     * @return the list of {@link KafkaServer} instances with states that 
match the predicate;
-     *         never null but  possibly empty
+     * @return the list of {@link BrokerServer} instances with states that 
match the predicate;
+     *         never null but possibly empty
      */
-    public Set<KafkaServer> brokersInState(Predicate<BrokerState> 
desiredState) {
-        return Arrays.stream(brokers)
-                     .filter(b -> hasState(b, desiredState))
-                     .collect(Collectors.toSet());
+    public Set<BrokerServer> brokersInState(Predicate<BrokerState> 
desiredState) {
+        return cluster.brokers().values().stream()
+                .filter(b -> hasState(b, desiredState))
+                .collect(Collectors.toSet());
     }
 
-    protected boolean hasState(KafkaServer server, Predicate<BrokerState> 
desiredState) {
+    protected boolean hasState(BrokerServer server, Predicate<BrokerState> 
desiredState) {
         try {
             return desiredState.test(server.brokerState());
         } catch (Throwable e) {
             // Broker failed to respond.
             return false;
         }
     }
-    

Review Comment:
   ?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##########
@@ -187,120 +146,73 @@ private void doStart() {
         producer = new KafkaProducer<>(producerProps, new 
ByteArraySerializer(), new ByteArraySerializer());
     }
 
-    public void stopOnlyKafka() {
-        stop(false, false);
-    }
-
-    public void stop() {
-        stop(true, true);
-    }
-
-    private void stop(boolean deleteLogDirs, boolean stopZK) {
-        try {
-            if (producer != null) {
-                producer.close();
-            }
-        } catch (Exception e) {
-            log.error("Could not shutdown producer ", e);
-            throw new RuntimeException("Could not shutdown producer", e);
-        }
-
-        for (KafkaServer broker : brokers) {
-            try {
-                broker.shutdown();
-            } catch (Throwable t) {
-                String msg = String.format("Could not shutdown broker at %s", 
address(broker));
-                log.error(msg, t);
-                throw new RuntimeException(msg, t);
-            }
-        }
-
-        if (deleteLogDirs) {
-            for (KafkaServer broker : brokers) {
-                try {
-                    log.info("Cleaning up kafka log dirs at {}", 
broker.config().logDirs());
-                    CoreUtils.delete(broker.config().logDirs());
-                } catch (Throwable t) {
-                    String msg = String.format("Could not clean up log dirs 
for broker at %s",
-                            address(broker));
-                    log.error(msg, t);
-                    throw new RuntimeException(msg, t);
-                }
-            }
-        }
-
-        try {
-            if (stopZK) {
-                zookeeper.shutdown();
-            }
-        } catch (Throwable t) {
-            String msg = String.format("Could not shutdown zookeeper at %s", 
zKConnectString());
-            log.error(msg, t);
-            throw new RuntimeException(msg, t);
+    /**
+     * Restarts the Kafka brokers. This can be called after {@link 
#stopOnlyBrokers()}. Note that if the Kafka brokers
+     * need to be listening on the same ports as earlier, the {@link 
#brokerConfig} should contain the
+     * {@link KafkaConfig#ListenersProp} property and it should use a fixed 
non-zero free port.
+     */
+    public void restartOnlyBrokers() {
+        for (BrokerServer broker : cluster.brokers().values()) {
+            broker.startup();
         }
     }
 
-    private static void putIfAbsent(final Properties props, final String 
propertyKey, final Object propertyValue) {
-        if (!props.containsKey(propertyKey)) {
-            props.put(propertyKey, propertyValue);
+    /**
+     * Stop only the Kafka brokers (and not the KRaft controllers). This can 
be used to test Connect's functionality
+     * when the backing Kafka cluster goes offline.
+     */
+    public void stopOnlyBrokers() {
+        for (BrokerServer broker : cluster.brokers().values()) {
+            broker.shutdown();
+            broker.awaitShutdown();

Review Comment:
   Nit: can be a couple one-liners, which may also be more efficient (we 
trigger shutdown in every broker and then await shutdown for every broker, 
instead of triggering and awaiting each broker one-by-one):
   
   ```java
   cluster.brokers().values().forEach(BrokerServer::shutdown);
   cluster.brokers().values().forEach(BrokerServer::awaitShutdown);
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##########
@@ -187,120 +146,73 @@ private void doStart() {
         producer = new KafkaProducer<>(producerProps, new 
ByteArraySerializer(), new ByteArraySerializer());
     }
 
-    public void stopOnlyKafka() {
-        stop(false, false);
-    }
-
-    public void stop() {
-        stop(true, true);
-    }
-
-    private void stop(boolean deleteLogDirs, boolean stopZK) {
-        try {
-            if (producer != null) {
-                producer.close();
-            }
-        } catch (Exception e) {
-            log.error("Could not shutdown producer ", e);
-            throw new RuntimeException("Could not shutdown producer", e);
-        }
-
-        for (KafkaServer broker : brokers) {
-            try {
-                broker.shutdown();
-            } catch (Throwable t) {
-                String msg = String.format("Could not shutdown broker at %s", 
address(broker));
-                log.error(msg, t);
-                throw new RuntimeException(msg, t);
-            }
-        }
-
-        if (deleteLogDirs) {
-            for (KafkaServer broker : brokers) {
-                try {
-                    log.info("Cleaning up kafka log dirs at {}", 
broker.config().logDirs());
-                    CoreUtils.delete(broker.config().logDirs());
-                } catch (Throwable t) {
-                    String msg = String.format("Could not clean up log dirs 
for broker at %s",
-                            address(broker));
-                    log.error(msg, t);
-                    throw new RuntimeException(msg, t);
-                }
-            }
-        }
-
-        try {
-            if (stopZK) {
-                zookeeper.shutdown();
-            }
-        } catch (Throwable t) {
-            String msg = String.format("Could not shutdown zookeeper at %s", 
zKConnectString());
-            log.error(msg, t);
-            throw new RuntimeException(msg, t);
+    /**
+     * Restarts the Kafka brokers. This can be called after {@link 
#stopOnlyBrokers()}. Note that if the Kafka brokers
+     * need to be listening on the same ports as earlier, the {@link 
#brokerConfig} should contain the
+     * {@link KafkaConfig#ListenersProp} property and it should use a fixed 
non-zero free port.
+     */
+    public void restartOnlyBrokers() {
+        for (BrokerServer broker : cluster.brokers().values()) {
+            broker.startup();
         }
     }
 
-    private static void putIfAbsent(final Properties props, final String 
propertyKey, final Object propertyValue) {
-        if (!props.containsKey(propertyKey)) {
-            props.put(propertyKey, propertyValue);
+    /**
+     * Stop only the Kafka brokers (and not the KRaft controllers). This can 
be used to test Connect's functionality
+     * when the backing Kafka cluster goes offline.
+     */
+    public void stopOnlyBrokers() {
+        for (BrokerServer broker : cluster.brokers().values()) {
+            broker.shutdown();
+            broker.awaitShutdown();
         }
     }
 
-    private String createLogDir() {
+    public void stop() {
+        if (producer != null) {
+            producer.close();
+        }
         try {
-            return 
Files.createTempDirectory(getClass().getSimpleName()).toString();
-        } catch (IOException e) {
-            log.error("Unable to create temporary log directory", e);
-            throw new ConnectException("Unable to create temporary log 
directory", e);
+            cluster.close();
+        } catch (Exception e) {
+            throw new ConnectException("Failed to shutdown test Kafka 
cluster", e);
         }
     }
 
     public String bootstrapServers() {
-        return Arrays.stream(brokers)
-                .map(this::address)
-                .collect(Collectors.joining(","));
-    }
-
-    public String address(KafkaServer server) {
-        final EndPoint endPoint = server.advertisedListeners().head();
-        return endPoint.host() + ":" + endPoint.port();
-    }
-
-    public String zKConnectString() {
-        return "127.0.0.1:" + zookeeper.port();
+        return cluster.bootstrapServers();

Review Comment:
   So much cleaner! 🎉 



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##########
@@ -89,96 +82,62 @@
 import static org.junit.Assert.assertFalse;
 
 /**
- * Setup an embedded Kafka cluster with specified number of brokers and 
specified broker properties. To be used for
- * integration tests.
+ * Setup an embedded Kafka KRaft cluster (using {@link 
kafka.testkit.KafkaClusterTestKit} internally) with the
+ * specified number of brokers and the specified broker properties. This can 
be used for integration tests and is
+ * typically used in conjunction with {@link EmbeddedConnectCluster}. 
Additional Kafka client properties can also be
+ * supplied if required. This class also provides various utility methods to 
easily create Kafka topics, produce data,
+ * consume data etc.
  */
 public class EmbeddedKafkaCluster {
 
     private static final Logger log = 
LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
 
-    private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120); 
+    private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120);
 
-    // Kafka Config
-    private final KafkaServer[] brokers;
+    private final KafkaClusterTestKit cluster;
     private final Properties brokerConfig;
-    private final Time time = new MockTime();
-    private final int[] currentBrokerPorts;
-    private final String[] currentBrokerLogDirs;
-    private final boolean hasListenerConfig;
-
-    final Map<String, String> clientConfigs;
-
-    private EmbeddedZookeeper zookeeper = null;
-    private ListenerName listenerName = new ListenerName("PLAINTEXT");
+    private final Map<String, String> clientConfigs;
     private KafkaProducer<byte[], byte[]> producer;
 
-    public EmbeddedKafkaCluster(final int numBrokers,
-                                final Properties brokerConfig) {
+    public EmbeddedKafkaCluster(final int numBrokers, final Properties 
brokerConfig) {
         this(numBrokers, brokerConfig, Collections.emptyMap());
     }
 
     public EmbeddedKafkaCluster(final int numBrokers,
-                                final Properties brokerConfig,
-                                final Map<String, String> clientConfigs) {
-        brokers = new KafkaServer[numBrokers];
-        currentBrokerPorts = new int[numBrokers];
-        currentBrokerLogDirs = new String[numBrokers];
+                                   final Properties brokerConfig,
+                                   final Map<String, String> clientConfigs) {
+        addDefaultBrokerPropsIfAbsent(brokerConfig, numBrokers);
+        try {
+            KafkaClusterTestKit.Builder clusterBuilder = new 
KafkaClusterTestKit.Builder(
+                    new TestKitNodes.Builder()
+                            .setCoResident(true)
+                            .setNumBrokerNodes(numBrokers)
+                            .setNumControllerNodes(numBrokers)
+                            .build()
+            );
+
+            brokerConfig.forEach((k, v) -> 
clusterBuilder.setConfigProp((String) k, v));
+            cluster = clusterBuilder.build();
+            cluster.nonFatalFaultHandler().setIgnore(true);
+        } catch (Exception e) {
+            throw new ConnectException("Failed to create test Kafka cluster", 
e);
+        }
         this.brokerConfig = brokerConfig;
-        // Since we support `stop` followed by `startOnlyKafkaOnSamePorts`, we 
track whether
-        // a listener config is defined during initialization in order to know 
if it's
-        // safe to override it
-        hasListenerConfig = brokerConfig.get(KafkaConfig.ListenersProp()) != 
null;
-
         this.clientConfigs = clientConfigs;
     }
 
-    /**
-     * Starts the Kafka cluster alone using the ports that were assigned 
during initialization of
-     * the harness.
-     *
-     * @throws ConnectException if a directory to store the data cannot be 
created
-     */
-    public void startOnlyKafkaOnSamePorts() {
-        doStart();
-    }
-
     public void start() {
-        // pick a random port
-        zookeeper = new EmbeddedZookeeper();
-        Arrays.fill(currentBrokerPorts, 0);
-        Arrays.fill(currentBrokerLogDirs, null);
-        doStart();
-    }
-
-    private void doStart() {
-        brokerConfig.put(KafkaConfig.ZkConnectProp(), zKConnectString());
-
-        putIfAbsent(brokerConfig, KafkaConfig.DeleteTopicEnableProp(), true);
-        putIfAbsent(brokerConfig, 
KafkaConfig.GroupInitialRebalanceDelayMsProp(), 0);
-        putIfAbsent(brokerConfig, 
KafkaConfig.OffsetsTopicReplicationFactorProp(), (short) brokers.length);
-        putIfAbsent(brokerConfig, KafkaConfig.AutoCreateTopicsEnableProp(), 
false);
-        // reduce the size of the log cleaner map to reduce test memory usage
-        putIfAbsent(brokerConfig, 
KafkaConfig.LogCleanerDedupeBufferSizeProp(), 2 * 1024 * 1024L);

Review Comment:
   Don't we want to keep this part?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##########
@@ -656,6 +567,19 @@ public KafkaProducer<byte[], byte[]> 
createProducer(Map<String, Object> producer
         return producer;
     }
 
+    private void addDefaultBrokerPropsIfAbsent(Properties brokerConfig, int 
numBrokers) {
+        putIfAbsent(brokerConfig, KafkaConfig.DeleteTopicEnableProp(), "true");
+        putIfAbsent(brokerConfig, 
KafkaConfig.GroupInitialRebalanceDelayMsProp(), "0");
+        putIfAbsent(brokerConfig, 
KafkaConfig.OffsetsTopicReplicationFactorProp(), String.valueOf(numBrokers));
+        putIfAbsent(brokerConfig, KafkaConfig.AutoCreateTopicsEnableProp(), 
"false");
+    }
+
+    private static void putIfAbsent(final Properties props, final String 
propertyKey, final Object propertyValue) {
+        if (!props.containsKey(propertyKey)) {
+            props.put(propertyKey, propertyValue);
+        }
+    }

Review Comment:
   Can't we just use `Properties::putIfAbsent`? IIRC the only reason we have 
the variant below for `Map<String, Object>` is because it was implemented when 
we still supported Java versions before 1.8.
   
   ```suggestion
       private void addDefaultBrokerPropsIfAbsent(Properties brokerConfig, int 
numBrokers) {
           brokerConfig.putIfAbsent(KafkaConfig.DeleteTopicEnableProp(), 
"true");
           
brokerConfig.putIfAbsent(KafkaConfig.GroupInitialRebalanceDelayMsProp(), "0");
           
brokerConfig.putIfAbsent(KafkaConfig.OffsetsTopicReplicationFactorProp(), 
String.valueOf(numBrokers));
           brokerConfig.putIfAbsent(KafkaConfig.AutoCreateTopicsEnableProp(), 
"false");
       }
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java:
##########
@@ -621,12 +622,21 @@ public void testConnectorReconfiguration() throws 
Exception {
      */
     @Test
     public void testTasksFailOnInabilityToFence() throws Exception {
-        brokerProps.put("authorizer.class.name", 
"kafka.security.authorizer.AclAuthorizer");
-        brokerProps.put("sasl.enabled.mechanisms", "PLAIN");
-        brokerProps.put("sasl.mechanism.inter.broker.protocol", "PLAIN");
-        brokerProps.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
-        brokerProps.put("listeners", "SASL_PLAINTEXT://localhost:0");
-        brokerProps.put("listener.name.sasl_plaintext.plain.sasl.jaas.config",
+        brokerProps.put(KafkaConfig.AuthorizerClassNameProp(), 
"org.apache.kafka.metadata.authorizer.StandardAuthorizer");
+        brokerProps.put(KafkaConfig.SaslEnabledMechanismsProp(), "PLAIN");
+        brokerProps.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp(), 
"PLAIN");
+        brokerProps.put(KafkaConfig.SaslMechanismControllerProtocolProp(), 
"PLAIN");
+        brokerProps.put(KafkaConfig.ListenersProp(), 
"EXTERNAL://localhost:0,CONTROLLER://localhost:0");
+        brokerProps.put(KafkaConfig.InterBrokerListenerNameProp(), "EXTERNAL");
+        brokerProps.put(KafkaConfig.ControllerListenerNamesProp(), 
"CONTROLLER");
+        brokerProps.put(KafkaConfig.ListenerSecurityProtocolMapProp(), 
"CONTROLLER:SASL_PLAINTEXT,EXTERNAL:SASL_PLAINTEXT");
+        brokerProps.put("listener.name.external.plain.sasl.jaas.config",
+                "org.apache.kafka.common.security.plain.PlainLoginModule 
required "
+                        + "username=\"super\" "
+                        + "password=\"super_pwd\" "
+                        + "user_connector=\"connector_pwd\" "
+                        + "user_super=\"super_pwd\";");

Review Comment:
   Can we pull this out into a variable and use it here and with the 
`istener.name.controller.plain.sasl.jaas.config` property?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to