chia7712 commented on code in PR #17666:
URL: https://github.com/apache/kafka/pull/17666#discussion_r1834636306
##########
test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java:
##########
@@ -147,10 +155,44 @@ default <T> T getUnderlying(Class<T> asClass) {
return asClass.cast(getUnderlying());
}
- Admin createAdminClient(Properties configOverrides);
+
//---------------------------[producer/consumer/admin]---------------------------//
- default Admin createAdminClient() {
- return createAdminClient(new Properties());
+ default <K, V> Producer<K, V> producer(Map<String, Object> configs) {
+ Properties props = new Properties();
Review Comment:
we can use `Map` instead of `Properties`
##########
test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java:
##########
@@ -147,10 +155,44 @@ default <T> T getUnderlying(Class<T> asClass) {
return asClass.cast(getUnderlying());
}
- Admin createAdminClient(Properties configOverrides);
+
//---------------------------[producer/consumer/admin]---------------------------//
- default Admin createAdminClient() {
- return createAdminClient(new Properties());
+ default <K, V> Producer<K, V> producer(Map<String, Object> configs) {
+ Properties props = new Properties();
+ props.putAll(configs);
+ props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "-1");
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
+ return new KafkaProducer<>(props);
+ }
+
+ default <K, V> Consumer<K, V> consumer(Map<String, Object> configs) {
+ Properties props = new Properties();
+ props.putAll(configs);
+ props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "group_" +
TestUtils.randomString(5));
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
Review Comment:
Could you please use `putIfAbsent` for consistency?
##########
test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java:
##########
@@ -147,10 +155,44 @@ default <T> T getUnderlying(Class<T> asClass) {
return asClass.cast(getUnderlying());
}
- Admin createAdminClient(Properties configOverrides);
+
//---------------------------[producer/consumer/admin]---------------------------//
- default Admin createAdminClient() {
- return createAdminClient(new Properties());
+ default <K, V> Producer<K, V> producer(Map<String, Object> configs) {
Review Comment:
Could you please add `producer()` helper?
##########
test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java:
##########
@@ -147,10 +155,44 @@ default <T> T getUnderlying(Class<T> asClass) {
return asClass.cast(getUnderlying());
}
- Admin createAdminClient(Properties configOverrides);
+
//---------------------------[producer/consumer/admin]---------------------------//
- default Admin createAdminClient() {
- return createAdminClient(new Properties());
+ default <K, V> Producer<K, V> producer(Map<String, Object> configs) {
+ Properties props = new Properties();
+ props.putAll(configs);
Review Comment:
Could you please add default byte[] serializer if callers do not define it?
##########
test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java:
##########
@@ -147,10 +155,44 @@ default <T> T getUnderlying(Class<T> asClass) {
return asClass.cast(getUnderlying());
}
- Admin createAdminClient(Properties configOverrides);
+
//---------------------------[producer/consumer/admin]---------------------------//
- default Admin createAdminClient() {
- return createAdminClient(new Properties());
+ default <K, V> Producer<K, V> producer(Map<String, Object> configs) {
+ Properties props = new Properties();
+ props.putAll(configs);
+ props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "-1");
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
+ return new KafkaProducer<>(props);
+ }
+
+ default <K, V> Consumer<K, V> consumer(Map<String, Object> configs) {
+ Properties props = new Properties();
+ props.putAll(configs);
+ props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "group_" +
TestUtils.randomString(5));
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
+ return new KafkaConsumer<>(props);
+ }
+
+ default Admin admin(Map<String, Object> configs, boolean
usingBootstrapControllers) {
+ Properties props = new Properties();
Review Comment:
ditto
##########
test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java:
##########
@@ -147,10 +155,44 @@ default <T> T getUnderlying(Class<T> asClass) {
return asClass.cast(getUnderlying());
}
- Admin createAdminClient(Properties configOverrides);
+
//---------------------------[producer/consumer/admin]---------------------------//
- default Admin createAdminClient() {
- return createAdminClient(new Properties());
+ default <K, V> Producer<K, V> producer(Map<String, Object> configs) {
+ Properties props = new Properties();
+ props.putAll(configs);
+ props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "-1");
Review Comment:
Could you please remove this? it would be better to add it if we have
observed many similar use cases
##########
test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java:
##########
@@ -147,10 +155,44 @@ default <T> T getUnderlying(Class<T> asClass) {
return asClass.cast(getUnderlying());
}
- Admin createAdminClient(Properties configOverrides);
+
//---------------------------[producer/consumer/admin]---------------------------//
- default Admin createAdminClient() {
- return createAdminClient(new Properties());
+ default <K, V> Producer<K, V> producer(Map<String, Object> configs) {
+ Properties props = new Properties();
+ props.putAll(configs);
+ props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "-1");
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
+ return new KafkaProducer<>(props);
+ }
+
+ default <K, V> Consumer<K, V> consumer(Map<String, Object> configs) {
+ Properties props = new Properties();
+ props.putAll(configs);
+ props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Review Comment:
Could you please add default byte[] deserializer if callers do not define it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]