This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5ad532f4ad0 KAFKA-18075 Prevent ClusterInstance default producer and 
consumer initialization with empty configs (#17926)
5ad532f4ad0 is described below

commit 5ad532f4ad040e94346d1c76e0729c717423b9ad
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Sun Nov 24 15:07:56 2024 +0800

    KAFKA-18075 Prevent ClusterInstance default producer and consumer 
initialization with empty configs (#17926)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../kafka/common/test/api/ClusterInstance.java     |  4 +--
 .../common/test/api/ClusterTestExtensionsTest.java | 30 ++++++++++++++++++++++
 2 files changed, 32 insertions(+), 2 deletions(-)

diff --git 
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java
 
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java
index 96ba599450b..b64e6d1a000 100644
--- 
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java
+++ 
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java
@@ -166,7 +166,7 @@ public interface ClusterInstance {
     }
 
     default <K, V> Producer<K, V> producer() {
-        return new KafkaProducer<>(Map.of());
+        return producer(Map.of());
     }
 
     default <K, V> Consumer<K, V> consumer(Map<String, Object> configs) {
@@ -180,7 +180,7 @@ public interface ClusterInstance {
     }
 
     default <K, V> Consumer<K, V> consumer() {
-        return new KafkaConsumer<>(Map.of());
+        return consumer(Map.of());
     }
 
     default Admin admin(Map<String, Object> configs, boolean 
usingBootstrapControllers) {
diff --git 
a/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java
 
b/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java
index 7fa28e64580..b96e8c28c08 100644
--- 
a/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java
+++ 
b/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java
@@ -32,6 +32,8 @@ import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.test.TestUtils;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
 import org.apache.kafka.server.common.MetadataVersion;
 
 import org.junit.jupiter.api.Assertions;
@@ -292,4 +294,32 @@ public class ClusterTestExtensionsTest {
             assertEquals(value, records.get(0).value());
         }
     }
+
+    @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}, serverProperties = {
+        @ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+        @ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+    })
+    public void testCreateDefaultProducerAndConsumer(ClusterInstance cluster) 
throws InterruptedException {
+        String topic = "topic";
+        Bytes key = Bytes.wrap("key".getBytes());
+        Bytes value = Bytes.wrap("value".getBytes());
+        try (Admin adminClient = cluster.admin();
+             Producer<Bytes, Bytes> producer = cluster.producer();
+             Consumer<Bytes, Bytes> consumer = cluster.consumer()
+        ) {
+            adminClient.createTopics(singleton(new NewTopic(topic, 1, (short) 
1)));
+            assertNotNull(producer);
+            assertNotNull(consumer);
+            producer.send(new ProducerRecord<>(topic, key, value));
+            producer.flush();
+            consumer.subscribe(singletonList(topic));
+            List<ConsumerRecord<Bytes, Bytes>> records = new ArrayList<>();
+            TestUtils.waitForCondition(() -> {
+                consumer.poll(Duration.ofMillis(100)).forEach(records::add);
+                return records.size() == 1;
+            }, "Failed to receive message");
+            assertEquals(key, records.get(0).key());
+            assertEquals(value, records.get(0).value());
+        }
+    }
 }

Reply via email to