This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git

commit 160ee7f0ead6147a8302f0bd157fb0f20bd0d9b3
Author: Aleksandr Savonin <[email protected]>
AuthorDate: Sun Jan 18 14:50:26 2026 +0100

    [FLINK-38937] Introduce TestKafkaContainer wrapper for Apache and Confluent 
Kafka images
    
    This wrapper automatically selects between KafkaContainer (for Apache Kafka 
images) and ConfluentKafkaContainer (for Confluent Platform images) based on 
the Docker image name, replacing the deprecated testcontainers KafkaContainer 
class.
---
 .../kafka/testutils/DockerImageVersions.java       |   1 +
 .../FlinkKafkaIntegrationCompatibilityTest.java    | 250 ++++++++++++++++++
 .../kafka/testutils/TestKafkaContainer.java        | 292 +++++++++++++++++++++
 .../TestKafkaContainerValidationTest.java          |  47 ++++
 4 files changed, 590 insertions(+)

diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/DockerImageVersions.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/DockerImageVersions.java
index 54ffc551..3704f417 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/DockerImageVersions.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/DockerImageVersions.java
@@ -25,6 +25,7 @@ package org.apache.flink.connector.kafka.testutils;
 public class DockerImageVersions {
 
     public static final String KAFKA = "confluentinc/cp-kafka:7.9.2";
+    public static final String APACHE_KAFKA = "apache/kafka:4.1.1";
 
     public static final String SCHEMA_REGISTRY = 
"confluentinc/cp-schema-registry:7.9.2";
 
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/FlinkKafkaIntegrationCompatibilityTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/FlinkKafkaIntegrationCompatibilityTest.java
new file mode 100644
index 00000000..925dd6b0
--- /dev/null
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/FlinkKafkaIntegrationCompatibilityTest.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.testutils;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
+import org.apache.flink.connector.kafka.sink.KafkaSink;
+import org.apache.flink.connector.kafka.source.KafkaSource;
+import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
+import 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.CloseableIterator;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static 
org.apache.flink.connector.kafka.testutils.DockerImageVersions.APACHE_KAFKA;
+import static 
org.apache.flink.connector.kafka.testutils.DockerImageVersions.KAFKA;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration test to verify Flink Kafka connector works with both Confluent 
Platform and Apache
+ * Kafka images.
+ *
+ * <p>This test validates the actual integration between Flink and Kafka by 
running real Flink jobs
+ * that read from and write to Kafka.
+ */
+class FlinkKafkaIntegrationCompatibilityTest {
+
+    private TestKafkaContainer kafkaContainer;
+    private AdminClient adminClient;
+
+    @AfterEach
+    void tearDown() {
+        if (adminClient != null) {
+            adminClient.close();
+        }
+        if (kafkaContainer != null) {
+            kafkaContainer.stop();
+        }
+    }
+
+    /**
+     * Tests Flink KafkaSource integration by reading records from Kafka and 
verifying the sum of
+     * values.
+     *
+     * <p>This is adapted from {@code 
KafkaSourceITCase.testValueOnlyDeserializer}.
+     */
+    @ParameterizedTest
+    @ValueSource(strings = {KAFKA, APACHE_KAFKA})
+    void testFlinkKafkaSourceIntegration(String dockerImage) throws Exception {
+        // Start Kafka container
+        kafkaContainer = new TestKafkaContainer(dockerImage);
+        kafkaContainer.start();
+
+        String topic1 = "test-source-topic1-" + UUID.randomUUID();
+        String topic2 = "test-source-topic2-" + UUID.randomUUID();
+        String bootstrapServers = kafkaContainer.getBootstrapServers();
+        int numPartitions = 4;
+        int numRecordsPerPartition = 5;
+
+        // Create topics
+        Map<String, Object> adminConfig = new HashMap<>();
+        adminConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
+        adminClient = AdminClient.create(adminConfig);
+        adminClient
+                .createTopics(
+                        Arrays.asList(
+                                new NewTopic(topic1, numPartitions, (short) 1),
+                                new NewTopic(topic2, numPartitions, (short) 
1)))
+                .all()
+                .get();
+
+        // Produce test data to both topics
+        // Values in partition N should be {N, N+1, N+2, ..., 
numRecordsPerPartition-1}
+        Map<String, Object> producerConfig = new HashMap<>();
+        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
+        producerConfig.put(
+                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class.getName());
+        producerConfig.put(
+                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class.getName());
+
+        try (KafkaProducer<Integer, Integer> producer = new 
KafkaProducer<>(producerConfig)) {
+            for (String topic : Arrays.asList(topic1, topic2)) {
+                for (int partition = 0; partition < numPartitions; 
partition++) {
+                    for (int value = partition; value < 
numRecordsPerPartition; value++) {
+                        producer.send(new ProducerRecord<>(topic, partition, 
partition, value))
+                                .get();
+                    }
+                }
+            }
+        }
+
+        // Create Flink KafkaSource
+        KafkaSource<Integer> source =
+                KafkaSource.<Integer>builder()
+                        .setBootstrapServers(bootstrapServers)
+                        .setGroupId("testFlinkKafkaSource-" + 
UUID.randomUUID())
+                        .setTopics(Arrays.asList(topic1, topic2))
+                        .setDeserializer(
+                                KafkaRecordDeserializationSchema.valueOnly(
+                                        IntegerDeserializer.class))
+                        .setStartingOffsets(OffsetsInitializer.earliest())
+                        .setBounded(OffsetsInitializer.latest())
+                        .build();
+
+        // Execute Flink job and collect results
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+
+        int actualSum = 0;
+        try (CloseableIterator<Integer> resultIterator =
+                env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"testFlinkKafkaSource")
+                        .executeAndCollect()) {
+            while (resultIterator.hasNext()) {
+                actualSum += resultIterator.next();
+            }
+        }
+
+        // Calculate expected sum
+        // Each partition N has values: N, N+1, ..., numRecordsPerPartition-1
+        int expectedSum = 0;
+        for (int partition = 0; partition < numPartitions; partition++) {
+            for (int value = partition; value < numRecordsPerPartition; 
value++) {
+                expectedSum += value;
+            }
+        }
+        // Two topics, so double the sum
+        expectedSum *= 2;
+
+        assertThat(actualSum)
+                .as(
+                        "Flink should read and sum all values correctly from 
Kafka with image: %s",
+                        dockerImage)
+                .isEqualTo(expectedSum);
+    }
+
+    /**
+     * Tests Flink KafkaSink integration by writing records to Kafka and 
verifying they are
+     * persisted.
+     *
+     * <p>This is adapted from {@code 
KafkaSinkITCase.testWriteRecordsToKafkaWithNoneGuarantee}.
+     */
+    @ParameterizedTest
+    @ValueSource(strings = {KAFKA, APACHE_KAFKA})
+    void testFlinkKafkaSinkIntegration(String dockerImage) throws Exception {
+        // Start Kafka container
+        kafkaContainer = new TestKafkaContainer(dockerImage);
+        kafkaContainer.start();
+
+        String topic = "test-sink-topic-" + UUID.randomUUID();
+        String bootstrapServers = kafkaContainer.getBootstrapServers();
+        int numRecords = 100;
+
+        // Create topic
+        Map<String, Object> adminConfig = new HashMap<>();
+        adminConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
+        adminClient = AdminClient.create(adminConfig);
+        adminClient
+                .createTopics(Collections.singleton(new NewTopic(topic, 1, 
(short) 1)))
+                .all()
+                .get();
+
+        // Create Flink KafkaSink and write records
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+
+        DataStream<String> source = env.fromSequence(0, numRecords - 1).map(i 
-> "record-" + i);
+
+        source.sinkTo(
+                KafkaSink.<String>builder()
+                        .setBootstrapServers(bootstrapServers)
+                        .setRecordSerializer(
+                                KafkaRecordSerializationSchema.builder()
+                                        .setTopic(topic)
+                                        .setValueSerializationSchema(new 
SimpleStringSchema())
+                                        .build())
+                        .build());
+
+        env.execute("testFlinkKafkaSink");
+
+        // Verify records were written to Kafka
+        Map<String, Object> consumerConfig = new HashMap<>();
+        consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
+        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group-" + 
UUID.randomUUID());
+        consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+        consumerConfig.put(
+                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
+        consumerConfig.put(
+                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
+
+        List<String> receivedRecords = new ArrayList<>();
+        try (KafkaConsumer<String, String> consumer = new 
KafkaConsumer<>(consumerConfig)) {
+            consumer.subscribe(Collections.singleton(topic));
+
+            long startTime = System.currentTimeMillis();
+            while (receivedRecords.size() < numRecords
+                    && System.currentTimeMillis() - startTime < 30000) {
+                ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofSeconds(1));
+                for (ConsumerRecord<String, String> record : records) {
+                    receivedRecords.add(record.value());
+                }
+            }
+        }
+
+        assertThat(receivedRecords)
+                .as("Flink should write all records to Kafka with image: %s", 
dockerImage)
+                .hasSize(numRecords)
+                .allMatch(record -> record.startsWith("record-"));
+    }
+}
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/TestKafkaContainer.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/TestKafkaContainer.java
new file mode 100644
index 00000000..98122654
--- /dev/null
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/TestKafkaContainer.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.testutils;
+
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.OutputFrame;
+import org.testcontainers.kafka.ConfluentKafkaContainer;
+import org.testcontainers.kafka.KafkaContainer;
+import org.testcontainers.lifecycle.Startable;
+import org.testcontainers.utility.DockerImageName;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.function.Consumer;
+
+/**
+ * A wrapper around Kafka test containers that automatically selects between 
{@link KafkaContainer}
+ * (for Apache Kafka images) and {@link ConfluentKafkaContainer} (for 
Confluent Platform images)
+ * based on the image name.
+ *
+ * <p>The wrapper configures Kafka in KRaft mode (without Zookeeper) with a 
multi-listener
+ * architecture:
+ *
+ * <ul>
+ *   <li>PLAINTEXT listener (port 9092) for host access via localhost
+ *   <li>BROKER listener (port 9093) for container-to-container communication
+ *   <li>CONTROLLER listener (port 9094) for KRaft consensus
+ * </ul>
+ *
+ * <p>The wrapper delegates all operations to the underlying testcontainers 
implementation.
+ *
+ * <p>This class implements both {@link Startable} (for JUnit 5's {@code 
@Container}) and {@link
+ * TestRule} (for JUnit 4's {@code @ClassRule}), ensuring proper lifecycle 
management and preventing
+ * orphan containers in both testing frameworks.
+ */
+public class TestKafkaContainer implements AutoCloseable, Startable, TestRule {
+
+    private final GenericContainer<?> delegate;
+    private final boolean isConfluentImage;
+    private String networkAlias;
+
+    public TestKafkaContainer(String imageName) {
+        this(DockerImageName.parse(imageName));
+    }
+
+    public TestKafkaContainer(DockerImageName dockerImageName) {
+        // Validate that the image is supported
+        if (isConfluentImage(dockerImageName)) {
+            this.isConfluentImage = true;
+            // Create ConfluentKafkaContainer with Confluent-specific 
configuration
+            // The advertised listeners will be configured in start() when 
network alias is known
+            this.delegate =
+                    new ConfluentKafkaContainer(dockerImageName)
+                            .withEnv("KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE", 
"false")
+                            .withEnv(
+                                    "KAFKA_LISTENERS",
+                                    
"PLAINTEXT://0.0.0.0:9092,BROKER://0.0.0.0:9093,CONTROLLER://0.0.0.0:9094")
+                            .withEnv(
+                                    "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
+                                    
"PLAINTEXT:PLAINTEXT,BROKER:PLAINTEXT,CONTROLLER:PLAINTEXT")
+                            .withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", 
"BROKER");
+        } else if (isApacheKafkaImage(dockerImageName)) {
+            this.isConfluentImage = false;
+            // Apache Kafka images use the new KafkaContainer from 
testcontainers
+            // which provides native KRaft support
+            this.delegate = new KafkaContainer(dockerImageName);
+        } else {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Unsupported Kafka image: %s. "
+                                    + "TestKafkaContainer currently supports 
only Confluent Platform (confluentinc/*) "
+                                    + "and Apache Kafka (apache/kafka) 
images.",
+                            dockerImageName.asCanonicalNameString()));
+        }
+
+        applyCommonConfiguration();
+    }
+
+    /**
+     * Checks if the provided Docker image is a Confluent Platform image.
+     *
+     * @param dockerImageName the Docker image name to check
+     * @return true if the image is a Confluent Platform image, false otherwise
+     */
+    private static boolean isConfluentImage(DockerImageName dockerImageName) {
+        String repository = dockerImageName.getRepository().toLowerCase();
+        return repository.startsWith("confluentinc/");
+    }
+
+    /**
+     * Checks if the provided Docker image is an Apache Kafka image.
+     *
+     * @param dockerImageName the Docker image name to check
+     * @return true if the image is an Apache Kafka image, false otherwise
+     */
+    private static boolean isApacheKafkaImage(DockerImageName dockerImageName) 
{
+        String repository = dockerImageName.getRepository().toLowerCase();
+        return repository.startsWith("apache/kafka");
+    }
+
+    /**
+     * Applies common Kafka configuration that is shared between both 
Confluent and Apache Kafka
+     * containers.
+     */
+    private void applyCommonConfiguration() {
+        delegate.withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
+                .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
+                .withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
+                .withEnv(
+                        "KAFKA_TRANSACTION_MAX_TIMEOUT_MS",
+                        String.valueOf(Duration.ofHours(2).toMillis()));
+    }
+
+    /**
+     * Returns the bootstrap servers connection string for connecting to Kafka.
+     *
+     * @return the bootstrap servers in the format {@code host:port}
+     */
+    public String getBootstrapServers() {
+        return isConfluentImage
+                ? ((ConfluentKafkaContainer) delegate).getBootstrapServers()
+                : ((KafkaContainer) delegate).getBootstrapServers();
+    }
+
+    /** Starts the container. Required for @Container support. */
+    @Override
+    public void start() {
+        if (networkAlias != null && isConfluentImage) {
+            delegate.withEnv(
+                    "KAFKA_ADVERTISED_LISTENERS",
+                    "PLAINTEXT://localhost:9092,BROKER://" + networkAlias + 
":9093");
+        }
+        // Apache Kafka container handles network aliases automatically 
through withNetworkAliases()
+        // No additional configuration needed here
+        delegate.start();
+    }
+
+    /** Stops the container. Required for correct @Container support. */
+    @Override
+    public void stop() {
+        delegate.stop();
+    }
+
+    /**
+     * Implements TestRule for JUnit 4 @ClassRule support. This ensures proper 
lifecycle management
+     * when used with JUnit 4 tests.
+     */
+    @Override
+    public Statement apply(Statement base, Description description) {
+        return new Statement() {
+            @Override
+            public void evaluate() throws Throwable {
+                start();
+                try {
+                    base.evaluate();
+                } finally {
+                    stop();
+                }
+            }
+        };
+    }
+
+    /** Closes the container. Delegates to {@link #stop()}. */
+    @Override
+    public void close() {
+        stop();
+    }
+
+    /**
+     * Sets an environment variable in the container.
+     *
+     * @param key the environment variable name
+     * @param value the environment variable value
+     * @return this {@link TestKafkaContainer} instance for method chaining
+     */
+    public TestKafkaContainer withEnv(String key, String value) {
+        delegate.withEnv(key, value);
+        return this;
+    }
+
+    /**
+     * Attaches the container to a network.
+     *
+     * @param network the network to attach to
+     * @return this {@link TestKafkaContainer} instance for method chaining
+     */
+    public TestKafkaContainer withNetwork(Network network) {
+        delegate.withNetwork(network);
+        return this;
+    }
+
+    /**
+     * Sets network aliases for the container.
+     *
+     * @param aliases the network aliases
+     * @return this {@link TestKafkaContainer} instance for method chaining
+     */
+    public TestKafkaContainer withNetworkAliases(String... aliases) {
+        delegate.withNetworkAliases(aliases);
+        // Store the first network alias for later use in configuring 
advertised listeners
+        if (aliases.length > 0) {
+            this.networkAlias = aliases[0];
+        }
+        return this;
+    }
+
+    /**
+     * Adds a log consumer to receive container log output.
+     *
+     * @param consumer the log consumer
+     * @return this {@link TestKafkaContainer} instance for method chaining
+     */
+    public TestKafkaContainer withLogConsumer(Consumer<OutputFrame> consumer) {
+        delegate.withLogConsumer(consumer);
+        return this;
+    }
+
+    /**
+     * Returns the network aliases for this container.
+     *
+     * @return the list of network aliases
+     */
+    public List<String> getNetworkAliases() {
+        return delegate.getNetworkAliases();
+    }
+
+    /**
+     * Sets Kafka logging level. Currently only supported for Confluent 
Platform images. Apache
+     * Kafka images use different logging configuration and this setting will 
be ignored.
+     *
+     * @param logLevel the log level (TRACE, DEBUG, INFO, WARN, ERROR, OFF)
+     * @return this {@link TestKafkaContainer} instance for method chaining
+     */
+    public TestKafkaContainer withKafkaLogLevel(String logLevel) {
+        if (isConfluentImage) {
+            delegate.withEnv("KAFKA_LOG4J_ROOT_LOGLEVEL", logLevel);
+            delegate.withEnv("KAFKA_LOG4J_LOGGERS", "state.change.logger=" + 
logLevel);
+            delegate.withEnv("KAFKA_LOG4J_TOOLS_ROOT_LOGLEVEL", logLevel);
+        }
+        return this;
+    }
+
+    /**
+     * Checks if the container is currently running.
+     *
+     * @return true if the container is running, false otherwise
+     */
+    public Boolean isRunning() {
+        return delegate.isRunning();
+    }
+
+    /**
+     * Returns the container ID.
+     *
+     * @return the container ID
+     */
+    public String getContainerId() {
+        return delegate.getContainerId();
+    }
+
+    /**
+     * Returns the underlying GenericContainer for advanced use cases.
+     *
+     * <p>This is useful when you need to pass the container to APIs that 
require {@code
+     * GenericContainer<?>} type, such as test frameworks or other 
testcontainers utilities.
+     *
+     * @return the underlying container instance (either {@link 
KafkaContainer} or {@link
+     *     ConfluentKafkaContainer})
+     */
+    public GenericContainer<?> getContainer() {
+        return delegate;
+    }
+}
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/TestKafkaContainerValidationTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/TestKafkaContainerValidationTest.java
new file mode 100644
index 00000000..38d8a81a
--- /dev/null
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/TestKafkaContainerValidationTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.testutils;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Unit tests for TestKafkaContainer validation logic. */
+class TestKafkaContainerValidationTest {
+
+    @Test
+    void testConfluentImageIsAccepted() {
+        assertThatCode(() -> new TestKafkaContainer(DockerImageVersions.KAFKA))
+                .doesNotThrowAnyException();
+    }
+
+    @Test
+    void testApacheKafkaImageIsAccepted() {
+        assertThatCode(() -> new 
TestKafkaContainer(DockerImageVersions.APACHE_KAFKA))
+                .doesNotThrowAnyException();
+    }
+
+    @Test
+    void testUnsupportedImageIsRejected() {
+        assertThatThrownBy(() -> new 
TestKafkaContainer("mycompany/my-kafka:1.0"))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("Unsupported Kafka image")
+                .hasMessageContaining("mycompany/my-kafka:1.0");
+    }
+}

Reply via email to