This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git

commit 2902e3b1c0e8fd8662352181de0adae6f14b7326
Author: Aleksandr Savonin <[email protected]>
AuthorDate: Sun Jan 18 14:51:04 2026 +0100

    [FLINK-38937] Migrate all Kafka tests to TestKafkaContainer with KRaft mode
---
 .../flink-end-to-end-tests-common-kafka/pom.xml    | 11 +--
 .../tests/util/kafka/KafkaContainerClient.java     |  8 +-
 .../flink/tests/util/kafka/KafkaSinkE2ECase.java   | 18 +++--
 .../flink/tests/util/kafka/KafkaSourceE2ECase.java | 20 ++---
 .../util/kafka/SQLClientSchemaRegistryITCase.java  | 59 +++++++-------
 .../flink/tests/util/kafka/SmokeKafkaITCase.java   |  9 +--
 .../sink/FlinkKafkaInternalProducerITCase.java     |  6 +-
 .../connector/kafka/sink/KafkaSinkITCase.java      | 22 ++---
 .../kafka/sink/KafkaTransactionLogITCase.java      | 26 +++---
 .../connector/kafka/sink/KafkaWriterTestBase.java  |  5 +-
 .../sink/internal/ProducerPoolImplITCase.java      |  6 +-
 .../testutils/KafkaSinkExternalContextFactory.java |  9 +--
 .../connector/kafka/source/KafkaSourceITCase.java  | 17 ++--
 .../kafka/testutils/DockerImageVersions.java       |  2 -
 .../DynamicKafkaSourceExternalContextFactory.java  | 13 ++-
 .../KafkaSourceExternalContextFactory.java         |  8 +-
 .../flink/connector/kafka/testutils/KafkaUtil.java | 11 +--
 .../kafka/testutils/TwoKafkaContainers.java        | 13 ++-
 .../connectors/kafka/KafkaTestEnvironmentImpl.java | 94 ++++++----------------
 .../metrics/KafkaMetricMutableWrapperTest.java     |  5 +-
 .../connectors/kafka/table/KafkaTableTestBase.java |  5 +-
 21 files changed, 153 insertions(+), 214 deletions(-)

diff --git 
a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/pom.xml 
b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/pom.xml
index acc72be6..98baaaa2 100644
--- 
a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/pom.xml
+++ 
b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/pom.xml
@@ -59,10 +59,6 @@ under the License.
                        <groupId>org.testcontainers</groupId>
                        <artifactId>testcontainers</artifactId>
                </dependency>
-               <dependency>
-                       <groupId>junit</groupId>
-                       <artifactId>junit</artifactId>
-               </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-test-utils-junit</artifactId>
@@ -107,11 +103,6 @@ under the License.
                        <artifactId>flink-core-api</artifactId>
                        <scope>test</scope>
                </dependency>
-               <dependency>
-                       <groupId>org.hamcrest</groupId>
-                       <artifactId>hamcrest-core</artifactId>
-                       <scope>test</scope>
-               </dependency>
                <dependency>
                        <groupId>org.assertj</groupId>
                        <artifactId>assertj-core</artifactId>
@@ -262,7 +253,7 @@ under the License.
                             
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
                         </artifactItem>
                     </artifactItems>
-                                       
<ignoredUnusedDeclaredDependencies>org.apache.flink:flink-streaming-kafka-test,org.apache.flink:flink-sql-avro,org.apache.flink:flink-sql-avro-confluent-registry,org.apache.flink:flink-connector-base,org.apache.flink:flink-sql-connector-kafka
+                                       
<ignoredUnusedDeclaredDependencies>org.apache.flink:flink-streaming-kafka-test,org.apache.flink:flink-sql-avro,org.apache.flink:flink-sql-avro-confluent-registry,org.apache.flink:flink-connector-base,org.apache.flink:flink-sql-connector-kafka,org.testcontainers:kafka
                                        </ignoredUnusedDeclaredDependencies>
                 </configuration>
             </plugin>
diff --git 
a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaContainerClient.java
 
b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaContainerClient.java
index d3f45e0e..e70e1343 100644
--- 
a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaContainerClient.java
+++ 
b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaContainerClient.java
@@ -19,6 +19,7 @@
 package org.apache.flink.tests.util.kafka;
 
 import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.connector.kafka.testutils.TestKafkaContainer;
 import org.apache.flink.core.testutils.CommonTestUtils;
 
 import org.apache.kafka.clients.CommonClientConfigs;
@@ -42,7 +43,6 @@ import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.KafkaContainer;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -53,12 +53,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
-/** A utility class that exposes common methods over a {@link KafkaContainer}. 
*/
+/** A utility class that exposes common methods over a {@link 
TestKafkaContainer}. */
 public class KafkaContainerClient {
     private static final Logger LOG = 
LoggerFactory.getLogger(KafkaContainerClient.class);
-    private final KafkaContainer container;
+    private final TestKafkaContainer container;
 
-    public KafkaContainerClient(KafkaContainer container) {
+    public KafkaContainerClient(TestKafkaContainer container) {
         this.container = container;
     }
 
diff --git 
a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java
 
b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java
index ea9a0079..479e3b3b 100644
--- 
a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java
+++ 
b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSinkE2ECase.java
@@ -21,6 +21,7 @@ package org.apache.flink.tests.util.kafka;
 import org.apache.flink.connector.kafka.sink.TransactionNamingStrategy;
 import 
org.apache.flink.connector.kafka.sink.testutils.KafkaSinkExternalContextFactory;
 import org.apache.flink.connector.kafka.testutils.DockerImageVersions;
+import org.apache.flink.connector.kafka.testutils.TestKafkaContainer;
 import 
org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment;
 import 
org.apache.flink.connector.testframe.external.DefaultContainerizedExternalSystem;
 import org.apache.flink.connector.testframe.junit.annotations.TestContext;
@@ -31,8 +32,7 @@ import 
org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.test.resources.ResourceTestUtils;
 
-import org.testcontainers.containers.KafkaContainer;
-import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.containers.GenericContainer;
 
 import java.util.Arrays;
 
@@ -50,13 +50,15 @@ public class KafkaSinkE2ECase extends 
SinkTestSuiteBase<String> {
     // Defines TestEnvironment
     @TestEnv FlinkContainerTestEnvironment flink = new 
FlinkContainerTestEnvironment(1, 6);
 
+    private final TestKafkaContainer kafkaContainer =
+            new 
TestKafkaContainer(DockerImageVersions.KAFKA).withNetworkAliases(KAFKA_HOSTNAME);
+
     // Defines ConnectorExternalSystem
+    @SuppressWarnings({"rawtypes", "unchecked"})
     @TestExternalSystem
-    DefaultContainerizedExternalSystem<KafkaContainer> kafka =
+    DefaultContainerizedExternalSystem<?> kafka =
             DefaultContainerizedExternalSystem.builder()
-                    .fromContainer(
-                            new 
KafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA))
-                                    .withNetworkAliases(KAFKA_HOSTNAME))
+                    .fromContainer((GenericContainer) 
kafkaContainer.getContainer())
                     
.bindWithFlinkContainer(flink.getFlinkContainers().getJobManager())
                     .build();
 
@@ -65,7 +67,7 @@ public class KafkaSinkE2ECase extends 
SinkTestSuiteBase<String> {
     @TestContext
     KafkaSinkExternalContextFactory incrementing =
             new KafkaSinkExternalContextFactory(
-                    kafka.getContainer(),
+                    kafkaContainer,
                     Arrays.asList(
                             
ResourceTestUtils.getResource("kafka-connector.jar")
                                     .toAbsolutePath()
@@ -84,7 +86,7 @@ public class KafkaSinkE2ECase extends 
SinkTestSuiteBase<String> {
     @TestContext
     KafkaSinkExternalContextFactory pooling =
             new KafkaSinkExternalContextFactory(
-                    kafka.getContainer(),
+                    kafkaContainer,
                     Arrays.asList(
                             
ResourceTestUtils.getResource("kafka-connector.jar")
                                     .toAbsolutePath()
diff --git 
a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSourceE2ECase.java
 
b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSourceE2ECase.java
index 1a2ac1f2..67f7dfa3 100644
--- 
a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSourceE2ECase.java
+++ 
b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSourceE2ECase.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.tests.util.kafka;
 
-import org.apache.flink.connector.kafka.testutils.DockerImageVersions;
 import 
org.apache.flink.connector.kafka.testutils.KafkaSourceExternalContextFactory;
+import org.apache.flink.connector.kafka.testutils.TestKafkaContainer;
 import 
org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment;
 import 
org.apache.flink.connector.testframe.external.DefaultContainerizedExternalSystem;
 import org.apache.flink.connector.testframe.junit.annotations.TestContext;
@@ -30,11 +30,11 @@ import 
org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.test.resources.ResourceTestUtils;
 
-import org.testcontainers.containers.KafkaContainer;
-import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.containers.GenericContainer;
 
 import java.util.Arrays;
 
+import static 
org.apache.flink.connector.kafka.testutils.DockerImageVersions.KAFKA;
 import static 
org.apache.flink.connector.kafka.testutils.KafkaSourceExternalContext.SplitMappingMode.PARTITION;
 import static 
org.apache.flink.connector.kafka.testutils.KafkaSourceExternalContext.SplitMappingMode.TOPIC;
 
@@ -48,13 +48,15 @@ public class KafkaSourceE2ECase extends 
SourceTestSuiteBase<String> {
     // Defines TestEnvironment
     @TestEnv FlinkContainerTestEnvironment flink = new 
FlinkContainerTestEnvironment(1, 6);
 
+    TestKafkaContainer kafkaContainer =
+            new TestKafkaContainer(KAFKA).withNetworkAliases(KAFKA_HOSTNAME);
+
     // Defines ConnectorExternalSystem
+    @SuppressWarnings({"rawtypes", "unchecked"})
     @TestExternalSystem
-    DefaultContainerizedExternalSystem<KafkaContainer> kafka =
+    DefaultContainerizedExternalSystem<?> kafka =
             DefaultContainerizedExternalSystem.builder()
-                    .fromContainer(
-                            new 
KafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA))
-                                    .withNetworkAliases(KAFKA_HOSTNAME))
+                    .fromContainer((GenericContainer) 
kafkaContainer.getContainer())
                     
.bindWithFlinkContainer(flink.getFlinkContainers().getJobManager())
                     .build();
 
@@ -64,7 +66,7 @@ public class KafkaSourceE2ECase extends 
SourceTestSuiteBase<String> {
     @TestContext
     KafkaSourceExternalContextFactory singleTopic =
             new KafkaSourceExternalContextFactory(
-                    kafka.getContainer(),
+                    kafkaContainer,
                     Arrays.asList(
                             
ResourceTestUtils.getResource("kafka-connector.jar").toUri().toURL(),
                             
ResourceTestUtils.getResource("kafka-clients.jar").toUri().toURL()),
@@ -74,7 +76,7 @@ public class KafkaSourceE2ECase extends 
SourceTestSuiteBase<String> {
     @TestContext
     KafkaSourceExternalContextFactory multipleTopic =
             new KafkaSourceExternalContextFactory(
-                    kafka.getContainer(),
+                    kafkaContainer,
                     Arrays.asList(
                             
ResourceTestUtils.getResource("kafka-connector.jar").toUri().toURL(),
                             
ResourceTestUtils.getResource("kafka-clients.jar").toUri().toURL()),
diff --git 
a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java
 
b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java
index 522eff09..986aa982 100644
--- 
a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java
+++ 
b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.tests.util.kafka;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.connector.kafka.testutils.DockerImageVersions;
 import org.apache.flink.connector.kafka.testutils.KafkaUtil;
+import org.apache.flink.connector.kafka.testutils.TestKafkaContainer;
 import org.apache.flink.connector.testframe.container.FlinkContainers;
 import org.apache.flink.connector.testframe.container.TestcontainersSettings;
 import org.apache.flink.test.resources.ResourceTestUtils;
@@ -36,13 +37,13 @@ import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
 import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.kafka.common.serialization.StringDeserializer;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.Timeout;
-import org.testcontainers.containers.KafkaContainer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
 import org.testcontainers.containers.Network;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
 import org.testcontainers.utility.DockerImageName;
 
 import java.nio.file.Path;
@@ -53,11 +54,12 @@ import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** End-to-end test for SQL client using Avro Confluent Registry format. */
-public class SQLClientSchemaRegistryITCase {
+@Testcontainers
+@Timeout(value = 10, unit = TimeUnit.MINUTES)
+class SQLClientSchemaRegistryITCase {
     public static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka";
     public static final String INTER_CONTAINER_REGISTRY_ALIAS = "registry";
     private static final Path sqlAvroJar = 
ResourceTestUtils.getResource(".*avro.jar");
@@ -65,29 +67,27 @@ public class SQLClientSchemaRegistryITCase {
             ResourceTestUtils.getResource(".*avro-confluent.jar");
     private final Path sqlConnectorKafkaJar = 
ResourceTestUtils.getResource(".*kafka.jar");
 
-    @ClassRule public static final Network NETWORK = Network.newNetwork();
+    public static final Network NETWORK = Network.newNetwork();
 
-    @ClassRule public static final Timeout TIMEOUT = new Timeout(10, 
TimeUnit.MINUTES);
-
-    @ClassRule
-    public static final KafkaContainer KAFKA =
+    @Container
+    public static final TestKafkaContainer KAFKA_CONTAINER =
             KafkaUtil.createKafkaContainer(SQLClientSchemaRegistryITCase.class)
                     .withNetwork(NETWORK)
                     .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
 
-    @ClassRule
+    @Container
     public static final SchemaRegistryContainer REGISTRY =
             new 
SchemaRegistryContainer(DockerImageName.parse(DockerImageVersions.SCHEMA_REGISTRY))
-                    .withKafka(INTER_CONTAINER_KAFKA_ALIAS + ":9092")
+                    .withKafka(INTER_CONTAINER_KAFKA_ALIAS + ":9093")
                     .withNetwork(NETWORK)
                     .withNetworkAliases(INTER_CONTAINER_REGISTRY_ALIAS)
-                    .dependsOn(KAFKA);
+                    .dependsOn(KAFKA_CONTAINER.getContainer());
 
     public final TestcontainersSettings testcontainersSettings =
             TestcontainersSettings.builder()
                     .network(NETWORK)
                     .logger(KafkaUtil.getLogger("flink", 
SQLClientSchemaRegistryITCase.class))
-                    .dependsOn(KAFKA)
+                    .dependsOn(KAFKA_CONTAINER.getContainer())
                     .build();
 
     public final FlinkContainers flink =
@@ -96,14 +96,14 @@ public class SQLClientSchemaRegistryITCase {
     private KafkaContainerClient kafkaClient;
     private CachedSchemaRegistryClient registryClient;
 
-    @Before
+    @BeforeEach
     public void setUp() throws Exception {
         flink.start();
-        kafkaClient = new KafkaContainerClient(KAFKA);
+        kafkaClient = new KafkaContainerClient(KAFKA_CONTAINER);
         registryClient = new 
CachedSchemaRegistryClient(REGISTRY.getSchemaRegistryUrl(), 10);
     }
 
-    @After
+    @AfterEach
     public void tearDown() {
         flink.stop();
     }
@@ -140,7 +140,7 @@ public class SQLClientSchemaRegistryITCase {
                         " 'connector' = 'kafka',",
                         " 'properties.bootstrap.servers' = '"
                                 + INTER_CONTAINER_KAFKA_ALIAS
-                                + ":9092',",
+                                + ":9093',",
                         " 'topic' = '" + testCategoryTopic + "',",
                         " 'scan.startup.mode' = 'earliest-offset',",
                         " 'properties.group.id' = 'test-group',",
@@ -158,7 +158,7 @@ public class SQLClientSchemaRegistryITCase {
                         " 'connector' = 'kafka',",
                         " 'properties.bootstrap.servers' = '"
                                 + INTER_CONTAINER_KAFKA_ALIAS
-                                + ":9092',",
+                                + ":9093',",
                         " 'properties.group.id' = 'test-group',",
                         " 'topic' = '" + testResultsTopic + "',",
                         " 'format' = 'csv',",
@@ -171,7 +171,7 @@ public class SQLClientSchemaRegistryITCase {
         List<String> categories =
                 kafkaClient.readMessages(
                         1, "test-group", testResultsTopic, new 
StringDeserializer());
-        assertThat(categories, 
equalTo(Collections.singletonList("1,electronics,null")));
+        
assertThat(categories).isEqualTo(Collections.singletonList("1,electronics,null"));
     }
 
     @Test
@@ -193,7 +193,7 @@ public class SQLClientSchemaRegistryITCase {
                         " 'connector' = 'kafka',",
                         " 'properties.bootstrap.servers' = '"
                                 + INTER_CONTAINER_KAFKA_ALIAS
-                                + ":9092',",
+                                + ":9093',",
                         " 'topic' = '" + testUserBehaviorTopic + "',",
                         " 'format' = 'avro-confluent',",
                         " 'avro-confluent.url' = 'http://";
@@ -207,7 +207,7 @@ public class SQLClientSchemaRegistryITCase {
         executeSqlStatements(sqlLines);
 
         List<Integer> versions = getAllVersions(behaviourSubject);
-        assertThat(versions.size(), equalTo(1));
+        assertThat(versions).hasSize(1);
         List<Object> userBehaviors =
                 kafkaClient.readMessages(
                         1,
@@ -219,9 +219,8 @@ public class SQLClientSchemaRegistryITCase {
                 registryClient.getByVersion(behaviourSubject, versions.get(0), 
false).getSchema();
         Schema userBehaviorSchema = new Schema.Parser().parse(schemaString);
         GenericRecordBuilder recordBuilder = new 
GenericRecordBuilder(userBehaviorSchema);
-        assertThat(
-                userBehaviors,
-                equalTo(
+        assertThat(userBehaviors)
+                .isEqualTo(
                         Collections.singletonList(
                                 recordBuilder
                                         .set("user_id", 1L)
@@ -229,7 +228,7 @@ public class SQLClientSchemaRegistryITCase {
                                         .set("category_id", 1L)
                                         .set("behavior", "buy")
                                         .set("ts", 1234000L)
-                                        .build())));
+                                        .build()));
     }
 
     private List<Integer> getAllVersions(String behaviourSubject) throws 
Exception {
diff --git 
a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java
 
b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java
index 2f1ad81a..e0d7d291 100644
--- 
a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java
+++ 
b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.connector.kafka.testutils.KafkaUtil;
+import org.apache.flink.connector.kafka.testutils.TestKafkaContainer;
 import org.apache.flink.connector.testframe.container.FlinkContainers;
 import org.apache.flink.connector.testframe.container.FlinkContainersSettings;
 import org.apache.flink.connector.testframe.container.TestcontainersSettings;
@@ -45,7 +46,6 @@ import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.extension.RegisterExtension;
-import org.testcontainers.containers.KafkaContainer;
 import org.testcontainers.containers.Network;
 import org.testcontainers.junit.jupiter.Container;
 import org.testcontainers.junit.jupiter.Testcontainers;
@@ -74,16 +74,15 @@ class SmokeKafkaITCase {
     private static final String EXAMPLE_JAR_MATCHER = 
"flink-streaming-kafka-test.*";
 
     @Container
-    public static final KafkaContainer KAFKA_CONTAINER =
+    public static final TestKafkaContainer KAFKA_CONTAINER =
             createKafkaContainer(SmokeKafkaITCase.class)
-                    .withEmbeddedZookeeper()
                     .withNetwork(NETWORK)
                     .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
 
     public static final TestcontainersSettings TESTCONTAINERS_SETTINGS =
             TestcontainersSettings.builder()
                     .logger(KafkaUtil.getLogger("flink", 
SmokeKafkaITCase.class))
-                    .dependsOn(KAFKA_CONTAINER)
+                    .dependsOn(KAFKA_CONTAINER.getContainer())
                     .build();
 
     @RegisterExtension
@@ -174,7 +173,7 @@ class SmokeKafkaITCase {
                                                                 String.join(
                                                                         ":",
                                                                         host,
-                                                                        
Integer.toString(9092)))
+                                                                        
Integer.toString(9093)))
                                                 
.collect(Collectors.joining(","))))
                         .addArgument("--group.id", "myconsumer")
                         .addArgument("--auto.offset.reset", "earliest")
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java
index 6bd6d880..c3ba30dd 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java
@@ -19,6 +19,7 @@ package org.apache.flink.connector.kafka.sink;
 
 import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer;
+import org.apache.flink.connector.kafka.testutils.TestKafkaContainer;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.flink.util.TestLoggerExtension;
@@ -42,7 +43,6 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.CsvSource;
 import org.junit.jupiter.params.provider.MethodSource;
-import org.testcontainers.containers.KafkaContainer;
 import org.testcontainers.junit.jupiter.Container;
 import org.testcontainers.junit.jupiter.Testcontainers;
 
@@ -73,8 +73,8 @@ class FlinkKafkaInternalProducerITCase {
                             .build());
 
     @Container
-    private static final KafkaContainer KAFKA_CONTAINER =
-            
createKafkaContainer(FlinkKafkaInternalProducerITCase.class).withEmbeddedZookeeper();
+    private static final TestKafkaContainer KAFKA_CONTAINER =
+            createKafkaContainer(FlinkKafkaInternalProducerITCase.class);
 
     @AfterEach
     public void check() {
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
index aa0f97ee..773b13b8 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
@@ -42,6 +42,7 @@ import 
org.apache.flink.connector.datagen.source.DataGeneratorSource;
 import 
org.apache.flink.connector.kafka.sink.testutils.KafkaSinkExternalContextFactory;
 import org.apache.flink.connector.kafka.testutils.DockerImageVersions;
 import org.apache.flink.connector.kafka.testutils.KafkaUtil;
+import org.apache.flink.connector.kafka.testutils.TestKafkaContainer;
 import 
org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
 import 
org.apache.flink.connector.testframe.external.DefaultContainerizedExternalSystem;
 import org.apache.flink.connector.testframe.junit.annotations.TestContext;
@@ -95,7 +96,7 @@ import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.Network;
 import org.testcontainers.junit.jupiter.Container;
 import org.testcontainers.junit.jupiter.Testcontainers;
@@ -151,9 +152,8 @@ public class KafkaSinkITCase extends TestLogger {
                             .build());
 
     @Container
-    public static final KafkaContainer KAFKA_CONTAINER =
+    public static final TestKafkaContainer KAFKA_CONTAINER =
             createKafkaContainer(KafkaSinkITCase.class)
-                    .withEmbeddedZookeeper()
                     .withNetwork(NETWORK)
                     .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
 
@@ -193,13 +193,15 @@ public class KafkaSinkITCase extends TestLogger {
         // Defines test environment on Flink MiniCluster
         @TestEnv MiniClusterTestEnvironment flink = new 
MiniClusterTestEnvironment();
 
+        private final TestKafkaContainer kafkaContainer =
+                new 
TestKafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA));
+
         // Defines external system
+        @SuppressWarnings({"rawtypes", "unchecked"})
         @TestExternalSystem
-        DefaultContainerizedExternalSystem<KafkaContainer> kafka =
+        DefaultContainerizedExternalSystem<?> kafka =
                 DefaultContainerizedExternalSystem.builder()
-                        .fromContainer(
-                                new KafkaContainer(
-                                        
DockerImageName.parse(DockerImageVersions.KAFKA)))
+                        .fromContainer((GenericContainer) 
kafkaContainer.getContainer())
                         .build();
 
         @TestSemantics
@@ -211,16 +213,14 @@ public class KafkaSinkITCase extends TestLogger {
         @TestContext
         KafkaSinkExternalContextFactory incrementing =
                 new KafkaSinkExternalContextFactory(
-                        kafka.getContainer(),
+                        kafkaContainer,
                         Collections.emptyList(),
                         TransactionNamingStrategy.INCREMENTING);
 
         @TestContext
         KafkaSinkExternalContextFactory pooling =
                 new KafkaSinkExternalContextFactory(
-                        kafka.getContainer(),
-                        Collections.emptyList(),
-                        TransactionNamingStrategy.POOLING);
+                        kafkaContainer, Collections.emptyList(), 
TransactionNamingStrategy.POOLING);
     }
 
     @Test
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLogITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLogITCase.java
index f11e3c71..5c9a2607 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLogITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLogITCase.java
@@ -19,9 +19,10 @@ package org.apache.flink.connector.kafka.sink;
 
 import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.connector.kafka.sink.KafkaTransactionLog.TransactionRecord;
+import org.apache.flink.connector.kafka.testutils.TestKafkaContainer;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.test.junit5.MiniClusterExtension;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
 
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
@@ -29,11 +30,12 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.junit.After;
-import org.junit.ClassRule;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.extension.RegisterExtension;
-import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -51,7 +53,9 @@ import static 
org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaCo
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link KafkaTransactionLog} to retrieve abortable Kafka 
transactions. */
-public class KafkaTransactionLogITCase extends TestLogger {
+@Testcontainers
+@ExtendWith(TestLoggerExtension.class)
+class KafkaTransactionLogITCase {
 
     private static final String TOPIC_NAME = "kafkaTransactionLogTest";
     private static final String TRANSACTIONAL_ID_PREFIX = "kafka-log";
@@ -65,14 +69,14 @@ public class KafkaTransactionLogITCase extends TestLogger {
                             .setConfiguration(new Configuration())
                             .build());
 
-    @ClassRule
-    public static final KafkaContainer KAFKA_CONTAINER =
-            
createKafkaContainer(KafkaTransactionLogITCase.class).withEmbeddedZookeeper();
+    @Container
+    public static final TestKafkaContainer KAFKA_CONTAINER =
+            createKafkaContainer(KafkaTransactionLogITCase.class);
 
     private final List<Producer<byte[], Integer>> openProducers = new 
ArrayList<>();
 
-    @After
-    public void tearDown() {
+    @AfterEach
+    void tearDown() {
         openProducers.forEach(Producer::close);
         checkProducerLeak();
     }
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterTestBase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterTestBase.java
index 3c13b9ad..16ae534d 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterTestBase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterTestBase.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
 import org.apache.flink.connector.kafka.sink.internal.BackchannelFactory;
 import org.apache.flink.connector.kafka.sink.internal.TransactionFinished;
 import org.apache.flink.connector.kafka.sink.internal.WritableBackchannel;
+import org.apache.flink.connector.kafka.testutils.TestKafkaContainer;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
 import org.apache.flink.metrics.groups.OperatorMetricGroup;
@@ -46,7 +47,6 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.TestInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.KafkaContainer;
 import org.testcontainers.containers.Network;
 import org.testcontainers.junit.jupiter.Container;
 import org.testcontainers.junit.jupiter.Testcontainers;
@@ -89,9 +89,8 @@ public abstract class KafkaWriterTestBase {
     protected TriggerTimeService timeService;
 
     @Container
-    public static final KafkaContainer KAFKA_CONTAINER =
+    public static final TestKafkaContainer KAFKA_CONTAINER =
             createKafkaContainer(KafkaWriterTestBase.class)
-                    .withEmbeddedZookeeper()
                     .withNetwork(NETWORK)
                     .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
 
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImplITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImplITCase.java
index d25aece8..aac6b91c 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImplITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImplITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.connector.kafka.sink.internal;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.kafka.testutils.TestKafkaContainer;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.test.junit5.MiniClusterExtension;
 
@@ -29,7 +30,6 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
-import org.testcontainers.containers.KafkaContainer;
 import org.testcontainers.junit.jupiter.Container;
 import org.testcontainers.junit.jupiter.Testcontainers;
 
@@ -59,8 +59,8 @@ class ProducerPoolImplITCase {
     public static final String TRANSACTIONAL_ID = "test-transactional-id";
 
     @Container
-    public static final KafkaContainer KAFKA_CONTAINER =
-            
createKafkaContainer(ProducerPoolImplITCase.class).withEmbeddedZookeeper();
+    public static final TestKafkaContainer KAFKA_CONTAINER =
+            createKafkaContainer(ProducerPoolImplITCase.class);
 
     @AfterEach
     void checkLeak() {
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContextFactory.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContextFactory.java
index bdebed03..8aff2c97 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContextFactory.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContextFactory.java
@@ -19,10 +19,9 @@
 package org.apache.flink.connector.kafka.sink.testutils;
 
 import org.apache.flink.connector.kafka.sink.TransactionNamingStrategy;
+import org.apache.flink.connector.kafka.testutils.TestKafkaContainer;
 import org.apache.flink.connector.testframe.external.ExternalContextFactory;
 
-import org.testcontainers.containers.KafkaContainer;
-
 import java.net.URL;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -31,12 +30,12 @@ import java.util.stream.Collectors;
 public class KafkaSinkExternalContextFactory
         implements ExternalContextFactory<KafkaSinkExternalContext> {
 
-    private final KafkaContainer kafkaContainer;
+    private final TestKafkaContainer kafkaContainer;
     private final List<URL> connectorJars;
     private final TransactionNamingStrategy transactionNamingStrategy;
 
     public KafkaSinkExternalContextFactory(
-            KafkaContainer kafkaContainer,
+            TestKafkaContainer kafkaContainer,
             List<URL> connectorJars,
             TransactionNamingStrategy transactionNamingStrategy) {
         this.kafkaContainer = kafkaContainer;
@@ -47,7 +46,7 @@ public class KafkaSinkExternalContextFactory
     private String getBootstrapServer() {
         final String internalEndpoints =
                 kafkaContainer.getNetworkAliases().stream()
-                        .map(host -> String.join(":", host, 
Integer.toString(9092)))
+                        .map(host -> String.join(":", host, 
Integer.toString(9093)))
                         .collect(Collectors.joining(","));
         return String.join(",", kafkaContainer.getBootstrapServers(), 
internalEndpoints);
     }
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
index 02b4f991..c5fe60ee 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDe
 import org.apache.flink.connector.kafka.testutils.DockerImageVersions;
 import 
org.apache.flink.connector.kafka.testutils.KafkaSourceExternalContextFactory;
 import org.apache.flink.connector.kafka.testutils.KafkaSourceTestEnv;
+import org.apache.flink.connector.kafka.testutils.TestKafkaContainer;
 import 
org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
 import 
org.apache.flink.connector.testframe.external.DefaultContainerizedExternalSystem;
 import org.apache.flink.connector.testframe.junit.annotations.TestContext;
@@ -64,7 +65,7 @@ import org.junit.jupiter.api.TestInstance.Lifecycle;
 import org.junit.jupiter.api.parallel.ResourceLock;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
-import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.utility.DockerImageName;
 
 import java.io.IOException;
@@ -419,13 +420,15 @@ public class KafkaSourceITCase {
         @TestEnv
         MiniClusterTestEnvironment flink = new MiniClusterTestEnvironment();
 
+        private final TestKafkaContainer kafkaContainer =
+                new 
TestKafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA));
+
         // Defines external system
+        @SuppressWarnings({"rawtypes", "unchecked"})
         @TestExternalSystem
-        DefaultContainerizedExternalSystem<KafkaContainer> kafka =
+        DefaultContainerizedExternalSystem<?> kafka =
                 DefaultContainerizedExternalSystem.builder()
-                        .fromContainer(
-                                new KafkaContainer(
-                                        
DockerImageName.parse(DockerImageVersions.KAFKA)))
+                        .fromContainer((GenericContainer) 
kafkaContainer.getContainer())
                         .build();
 
         // Defines 2 External context Factories, so test cases will be invoked 
twice using these two
@@ -434,13 +437,13 @@ public class KafkaSourceITCase {
         @TestContext
         KafkaSourceExternalContextFactory singleTopic =
                 new KafkaSourceExternalContextFactory(
-                        kafka.getContainer(), Collections.emptyList(), 
PARTITION);
+                        kafkaContainer, Collections.emptyList(), PARTITION);
 
         @SuppressWarnings("unused")
         @TestContext
         KafkaSourceExternalContextFactory multipleTopic =
                 new KafkaSourceExternalContextFactory(
-                        kafka.getContainer(), Collections.emptyList(), TOPIC);
+                        kafkaContainer, Collections.emptyList(), TOPIC);
     }
 
     // -----------------
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/DockerImageVersions.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/DockerImageVersions.java
index 3704f417..ec1e1535 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/DockerImageVersions.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/DockerImageVersions.java
@@ -28,6 +28,4 @@ public class DockerImageVersions {
     public static final String APACHE_KAFKA = "apache/kafka:4.1.1";
 
     public static final String SCHEMA_REGISTRY = 
"confluentinc/cp-schema-registry:7.9.2";
-
-    public static final String ZOOKEEPER = "zookeeper:3.8.4";
 }
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/DynamicKafkaSourceExternalContextFactory.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/DynamicKafkaSourceExternalContextFactory.java
index 71798e18..fa584a56 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/DynamicKafkaSourceExternalContextFactory.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/DynamicKafkaSourceExternalContextFactory.java
@@ -22,7 +22,6 @@ import 
org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSource;
 import org.apache.flink.connector.testframe.external.ExternalContextFactory;
 
 import com.google.common.collect.ImmutableList;
-import org.testcontainers.containers.KafkaContainer;
 
 import java.net.URL;
 import java.util.List;
@@ -32,13 +31,13 @@ import java.util.stream.Collectors;
 public class DynamicKafkaSourceExternalContextFactory
         implements ExternalContextFactory<DynamicKafkaSourceExternalContext> {
 
-    private final KafkaContainer kafkaContainer0;
-    private final KafkaContainer kafkaContainer1;
+    private final TestKafkaContainer kafkaContainer0;
+    private final TestKafkaContainer kafkaContainer1;
     private final List<URL> connectorJars;
 
     public DynamicKafkaSourceExternalContextFactory(
-            KafkaContainer kafkaContainer0,
-            KafkaContainer kafkaContainer1,
+            TestKafkaContainer kafkaContainer0,
+            TestKafkaContainer kafkaContainer1,
             List<URL> connectorJars) {
         this.kafkaContainer0 = kafkaContainer0;
         this.kafkaContainer1 = kafkaContainer1;
@@ -53,10 +52,10 @@ public class DynamicKafkaSourceExternalContextFactory
                 connectorJars);
     }
 
-    private static String getBootstrapServers(KafkaContainer kafkaContainer) {
+    private static String getBootstrapServers(TestKafkaContainer 
kafkaContainer) {
         final String internalEndpoints =
                 kafkaContainer.getNetworkAliases().stream()
-                        .map(host -> String.join(":", host, 
Integer.toString(9092)))
+                        .map(host -> String.join(":", host, 
Integer.toString(9093)))
                         .collect(Collectors.joining(","));
         return String.join(",", kafkaContainer.getBootstrapServers(), 
internalEndpoints);
     }
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceExternalContextFactory.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceExternalContextFactory.java
index ef9113af..fc9f1ba8 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceExternalContextFactory.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceExternalContextFactory.java
@@ -20,8 +20,6 @@ package org.apache.flink.connector.kafka.testutils;
 
 import org.apache.flink.connector.testframe.external.ExternalContextFactory;
 
-import org.testcontainers.containers.KafkaContainer;
-
 import java.net.URL;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -30,12 +28,12 @@ import java.util.stream.Collectors;
 public class KafkaSourceExternalContextFactory
         implements ExternalContextFactory<KafkaSourceExternalContext> {
 
-    private final KafkaContainer kafkaContainer;
+    private final TestKafkaContainer kafkaContainer;
     private final List<URL> connectorJars;
     private final KafkaSourceExternalContext.SplitMappingMode splitMappingMode;
 
     public KafkaSourceExternalContextFactory(
-            KafkaContainer kafkaContainer,
+            TestKafkaContainer kafkaContainer,
             List<URL> connectorJars,
             KafkaSourceExternalContext.SplitMappingMode splitMappingMode) {
         this.kafkaContainer = kafkaContainer;
@@ -46,7 +44,7 @@ public class KafkaSourceExternalContextFactory
     protected String getBootstrapServer() {
         final String internalEndpoints =
                 kafkaContainer.getNetworkAliases().stream()
-                        .map(host -> String.join(":", host, 
Integer.toString(9092)))
+                        .map(host -> String.join(":", host, 
Integer.toString(9093)))
                         .collect(Collectors.joining(","));
         return String.join(",", kafkaContainer.getBootstrapServers(), 
internalEndpoints);
     }
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaUtil.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaUtil.java
index 1b4bf77c..934c1c92 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaUtil.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaUtil.java
@@ -27,7 +27,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.KafkaContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.utility.DockerImageName;
 
@@ -52,28 +51,26 @@ public class KafkaUtil {
     private KafkaUtil() {}
 
     /** This method helps to set commonly used Kafka configurations and sets 
up the logger. */
-    public static KafkaContainer createKafkaContainer(Class<?> testCase) {
+    public static TestKafkaContainer createKafkaContainer(Class<?> testCase) {
         return createKafkaContainer(getContainerName("kafka", testCase));
     }
 
     /** This method helps to set commonly used Kafka configurations and sets 
up the logger. */
-    public static KafkaContainer createKafkaContainer(String containerName) {
+    public static TestKafkaContainer createKafkaContainer(String 
containerName) {
         Logger logger = getLogger(containerName);
 
         String logLevel = inferLogLevel(logger);
 
         Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(logger, true);
-        return new 
KafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA))
+        return new 
TestKafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA))
                 .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
                 .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
                 .withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
-                .withEnv("KAFKA_LOG4J_ROOT_LOGLEVEL", logLevel)
-                .withEnv("KAFKA_LOG4J_LOGGERS", "state.change.logger=" + 
logLevel)
+                .withKafkaLogLevel(logLevel)
                 .withEnv("KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE", "false")
                 .withEnv(
                         "KAFKA_TRANSACTION_MAX_TIMEOUT_MS",
                         String.valueOf(Duration.ofHours(2).toMillis()))
-                .withEnv("KAFKA_LOG4J_TOOLS_ROOT_LOGLEVEL", logLevel)
                 .withLogConsumer(logConsumer);
     }
 
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/TwoKafkaContainers.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/TwoKafkaContainers.java
index 6c5036a9..247357e4 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/TwoKafkaContainers.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/TwoKafkaContainers.java
@@ -19,18 +19,17 @@
 package org.apache.flink.connector.kafka.testutils;
 
 import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.KafkaContainer;
 import org.testcontainers.utility.DockerImageName;
 
 /** Wraps 2 Kafka containers into one for test utilities that only accept one 
container. */
 public class TwoKafkaContainers extends GenericContainer<TwoKafkaContainers> {
-    private final KafkaContainer kafka0;
-    private final KafkaContainer kafka1;
+    private final TestKafkaContainer kafka0;
+    private final TestKafkaContainer kafka1;
 
     public TwoKafkaContainers() {
         DockerImageName dockerImageName = 
DockerImageName.parse(DockerImageVersions.KAFKA);
-        this.kafka0 = new KafkaContainer(dockerImageName);
-        this.kafka1 = new KafkaContainer(dockerImageName);
+        this.kafka0 = new TestKafkaContainer(dockerImageName);
+        this.kafka1 = new TestKafkaContainer(dockerImageName);
     }
 
     @Override
@@ -50,11 +49,11 @@ public class TwoKafkaContainers extends 
GenericContainer<TwoKafkaContainers> {
         kafka1.stop();
     }
 
-    public KafkaContainer getKafka0() {
+    public TestKafkaContainer getKafka0() {
         return kafka0;
     }
 
-    public KafkaContainer getKafka1() {
+    public TestKafkaContainer getKafka1() {
         return kafka1;
     }
 }
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 9c360467..a4d1a8b8 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.connector.kafka.testutils.DockerImageVersions;
 import org.apache.flink.connector.kafka.testutils.KafkaUtil;
+import org.apache.flink.connector.kafka.testutils.TestKafkaContainer;
 import org.apache.flink.core.testutils.CommonTestUtils;
 
 import org.apache.kafka.clients.admin.AdminClient;
@@ -31,10 +32,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.DockerClientFactory;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.KafkaContainer;
 import org.testcontainers.containers.Network;
-import org.testcontainers.utility.DockerImageName;
 
 import javax.annotation.Nullable;
 
@@ -58,28 +56,20 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
 
     protected static final Logger LOG = 
LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class);
 
-    private static final String ZOOKEEPER_HOSTNAME = "zookeeper";
-    private static final int ZOOKEEPER_PORT = 2181;
-
-    private final Map<Integer, KafkaContainer> brokers = new HashMap<>();
+    private final Map<Integer, TestKafkaContainer> brokers = new HashMap<>();
     private final Set<Integer> pausedBroker = new HashSet<>();
-    private @Nullable GenericContainer<?> zookeeper;
     private @Nullable Network network;
     private String brokerConnectionString = "";
     private Properties standardProps;
-    // 6 seconds is default. Seems to be too small for travis. 30 seconds
-    private int zkTimeout = 30000;
     private Config config;
     private static final int REQUEST_TIMEOUT_SECONDS = 30;
 
     @Override
     public void prepare(Config config) throws Exception {
-        // increase the timeout since in Travis ZK connection takes long time 
for secure connection.
         if (config.isSecureMode()) {
-            // run only one kafka server to avoid multiple ZK connections from 
many instances -
-            // Travis timeout
+            // run only one kafka server to avoid multiple connections from 
many instances - Travis
+            // timeout
             config.setKafkaServersNumber(1);
-            zkTimeout = zkTimeout * 15;
         }
         this.config = config;
         brokers.clear();
@@ -92,8 +82,6 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
         standardProps.setProperty("bootstrap.servers", brokerConnectionString);
         standardProps.setProperty("group.id", "flink-tests");
         standardProps.setProperty("enable.auto.commit", "false");
-        standardProps.setProperty("zookeeper.session.timeout.ms", 
String.valueOf(zkTimeout));
-        standardProps.setProperty("zookeeper.connection.timeout.ms", 
String.valueOf(zkTimeout));
         standardProps.setProperty("auto.offset.reset", "earliest"); // read 
from the beginning.
         standardProps.setProperty(
                 "max.partition.fetch.bytes",
@@ -198,10 +186,6 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
             prop.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
             prop.put("security.protocol", "SASL_PLAINTEXT");
             prop.put("sasl.kerberos.service.name", "kafka");
-
-            // add special timeout for Travis
-            prop.setProperty("zookeeper.session.timeout.ms", 
String.valueOf(zkTimeout));
-            prop.setProperty("zookeeper.connection.timeout.ms", 
String.valueOf(zkTimeout));
             prop.setProperty("metadata.fetch.timeout.ms", "120000");
         }
         return prop;
@@ -251,13 +235,9 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
 
     @Override
     public void shutdown() throws Exception {
-        brokers.values().forEach(GenericContainer::stop);
+        brokers.values().forEach(TestKafkaContainer::stop);
         brokers.clear();
 
-        if (zookeeper != null) {
-            zookeeper.stop();
-        }
-
         if (network != null) {
             network.close();
         }
@@ -307,62 +287,34 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
     }
 
     private void startKafkaContainerCluster(int numBrokers) {
-        if (numBrokers > 1) {
-            network = Network.newNetwork();
-            zookeeper = createZookeeperContainer(network);
-            zookeeper.start();
-            LOG.info("Zookeeper container started");
-        }
+        network = Network.newNetwork();
         for (int brokerID = 0; brokerID < numBrokers; brokerID++) {
-            KafkaContainer broker = createKafkaContainer(brokerID, zookeeper);
+            TestKafkaContainer broker = createKafkaContainer(brokerID);
             brokers.put(brokerID, broker);
         }
-        new 
ArrayList<>(brokers.values()).parallelStream().forEach(GenericContainer::start);
+        new 
ArrayList<>(brokers.values()).parallelStream().forEach(TestKafkaContainer::start);
         LOG.info("{} brokers started", numBrokers);
         brokerConnectionString =
                 brokers.values().stream()
-                        .map(KafkaContainer::getBootstrapServers)
-                        // Here we have URL like 
"PLAINTEXT://127.0.0.1:15213", and we only keep the
-                        // "127.0.0.1:15213" part in broker connection string
-                        .map(server -> server.split("://")[1])
+                        .map(TestKafkaContainer::getBootstrapServers)
+                        // Here we have URL like "PLAINTEXT://127.0.0.1:15213" 
(Confluent Kafka),
+                        // and we only keep the "127.0.0.1:15213" part.
+                        // Apache Kafka may return "host:port" directly 
without protocol prefix,
+                        // so we handle both cases.
+                        .map(server -> server.contains("://") ? 
server.split("://", 2)[1] : server)
                         .collect(Collectors.joining(","));
+        LOG.info("Broker connection string: {}", brokerConnectionString);
     }
 
-    private GenericContainer<?> createZookeeperContainer(Network network) {
-        return new 
GenericContainer<>(DockerImageName.parse(DockerImageVersions.ZOOKEEPER))
-                .withNetwork(network)
-                .withNetworkAliases(ZOOKEEPER_HOSTNAME)
-                .withEnv("ZOOKEEPER_CLIENT_PORT", 
String.valueOf(ZOOKEEPER_PORT));
-    }
-
-    private KafkaContainer createKafkaContainer(
-            int brokerID, @Nullable GenericContainer<?> zookeeper) {
+    private TestKafkaContainer createKafkaContainer(int brokerID) {
         String brokerName = String.format("Kafka-%d", brokerID);
-        KafkaContainer broker =
-                KafkaUtil.createKafkaContainer(brokerName)
-                        .withNetworkAliases(brokerName)
-                        .withEnv("KAFKA_BROKER_ID", String.valueOf(brokerID))
-                        .withEnv("KAFKA_MESSAGE_MAX_BYTES", String.valueOf(50 
* 1024 * 1024))
-                        .withEnv("KAFKA_REPLICA_FETCH_MAX_BYTES", 
String.valueOf(50 * 1024 * 1024))
-                        .withEnv(
-                                "KAFKA_TRANSACTION_MAX_TIMEOUT_MS",
-                                Integer.toString(1000 * 60 * 60 * 2))
-                        // Disable log deletion to prevent records from being 
deleted during test
-                        // run
-                        .withEnv("KAFKA_LOG_RETENTION_MS", "-1")
-                        .withEnv("KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS", 
String.valueOf(zkTimeout))
-                        .withEnv(
-                                "KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS", 
String.valueOf(zkTimeout));
-
-        if (zookeeper != null) {
-            broker.dependsOn(zookeeper)
-                    .withNetwork(zookeeper.getNetwork())
-                    .withExternalZookeeper(
-                            String.format("%s:%d", ZOOKEEPER_HOSTNAME, 
ZOOKEEPER_PORT));
-        } else {
-            broker.withEmbeddedZookeeper();
-        }
-        return broker;
+        return KafkaUtil.createKafkaContainer(brokerName)
+                .withNetworkAliases(brokerName)
+                .withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER")
+                .withEnv("KAFKA_MESSAGE_MAX_BYTES", String.valueOf(50 * 1024 * 
1024))
+                .withEnv("KAFKA_REPLICA_FETCH_MAX_BYTES", String.valueOf(50 * 
1024 * 1024))
+                .withEnv("KAFKA_TRANSACTION_MAX_TIMEOUT_MS", 
Integer.toString(1000 * 60 * 60 * 2))
+                .withEnv("KAFKA_LOG_RETENTION_MS", "-1");
     }
 
     private void pause(int brokerId) {
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricMutableWrapperTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricMutableWrapperTest.java
index c0ea690d..0d516c28 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricMutableWrapperTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricMutableWrapperTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.connectors.kafka.internals.metrics;
 
+import org.apache.flink.connector.kafka.testutils.TestKafkaContainer;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.util.TestLoggerExtension;
 
@@ -28,7 +29,6 @@ import 
org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
-import org.testcontainers.containers.KafkaContainer;
 import org.testcontainers.containers.Network;
 import org.testcontainers.junit.jupiter.Container;
 import org.testcontainers.junit.jupiter.Testcontainers;
@@ -50,9 +50,8 @@ class KafkaMetricMutableWrapperTest {
     private static final Network NETWORK = Network.newNetwork();
 
     @Container
-    public static final KafkaContainer KAFKA_CONTAINER =
+    public static final TestKafkaContainer KAFKA_CONTAINER =
             createKafkaContainer(KafkaMetricMutableWrapperTest.class)
-                    .withEmbeddedZookeeper()
                     .withNetwork(NETWORK)
                     .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
 
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
index 5999df09..85e20cab 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.connectors.kafka.table;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestartStrategyOptions;
 import org.apache.flink.connector.kafka.testutils.KafkaUtil;
+import org.apache.flink.connector.kafka.testutils.TestKafkaContainer;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.test.util.AbstractTestBase;
@@ -41,7 +42,6 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.KafkaContainer;
 import org.testcontainers.junit.jupiter.Container;
 import org.testcontainers.junit.jupiter.Testcontainers;
 
@@ -66,9 +66,8 @@ abstract class KafkaTableTestBase extends AbstractTestBase {
     private static final int zkTimeoutMills = 30000;
 
     @Container
-    public static final KafkaContainer KAFKA_CONTAINER =
+    public static final TestKafkaContainer KAFKA_CONTAINER =
             KafkaUtil.createKafkaContainer(KafkaTableTestBase.class)
-                    .withEmbeddedZookeeper()
                     .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS)
                     .withEnv(
                             "KAFKA_TRANSACTION_MAX_TIMEOUT_MS",

Reply via email to