This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5ad532f4ad0 KAFKA-18075 Prevent ClusterInstance default producer and
consumer initialization with empty configs (#17926)
5ad532f4ad0 is described below
commit 5ad532f4ad040e94346d1c76e0729c717423b9ad
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Sun Nov 24 15:07:56 2024 +0800
KAFKA-18075 Prevent ClusterInstance default producer and consumer
initialization with empty configs (#17926)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kafka/common/test/api/ClusterInstance.java | 4 +--
.../common/test/api/ClusterTestExtensionsTest.java | 30 ++++++++++++++++++++++
2 files changed, 32 insertions(+), 2 deletions(-)
diff --git
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java
index 96ba599450b..b64e6d1a000 100644
---
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java
+++
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java
@@ -166,7 +166,7 @@ public interface ClusterInstance {
}
default <K, V> Producer<K, V> producer() {
- return new KafkaProducer<>(Map.of());
+ return producer(Map.of());
}
default <K, V> Consumer<K, V> consumer(Map<String, Object> configs) {
@@ -180,7 +180,7 @@ public interface ClusterInstance {
}
default <K, V> Consumer<K, V> consumer() {
- return new KafkaConsumer<>(Map.of());
+ return consumer(Map.of());
}
default Admin admin(Map<String, Object> configs, boolean
usingBootstrapControllers) {
diff --git
a/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java
b/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java
index 7fa28e64580..b96e8c28c08 100644
---
a/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java
+++
b/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java
@@ -32,6 +32,8 @@ import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.test.TestUtils;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.Assertions;
@@ -292,4 +294,32 @@ public class ClusterTestExtensionsTest {
assertEquals(value, records.get(0).value());
}
}
+
+ @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}, serverProperties = {
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+ })
+ public void testCreateDefaultProducerAndConsumer(ClusterInstance cluster)
throws InterruptedException {
+ String topic = "topic";
+ Bytes key = Bytes.wrap("key".getBytes());
+ Bytes value = Bytes.wrap("value".getBytes());
+ try (Admin adminClient = cluster.admin();
+ Producer<Bytes, Bytes> producer = cluster.producer();
+ Consumer<Bytes, Bytes> consumer = cluster.consumer()
+ ) {
+ adminClient.createTopics(singleton(new NewTopic(topic, 1, (short)
1)));
+ assertNotNull(producer);
+ assertNotNull(consumer);
+ producer.send(new ProducerRecord<>(topic, key, value));
+ producer.flush();
+ consumer.subscribe(singletonList(topic));
+ List<ConsumerRecord<Bytes, Bytes>> records = new ArrayList<>();
+ TestUtils.waitForCondition(() -> {
+ consumer.poll(Duration.ofMillis(100)).forEach(records::add);
+ return records.size() == 1;
+ }, "Failed to receive message");
+ assertEquals(key, records.get(0).key());
+ assertEquals(value, records.get(0).value());
+ }
+ }
}