This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2ba30cc466f KAFKA-19574: Improve producer and consumer config files 
(#20302)
2ba30cc466f is described below

commit 2ba30cc466fd854b6ed7fa2afd157a80fcf7382a
Author: Federico Valeri <[email protected]>
AuthorDate: Tue Sep 2 04:24:35 2025 +0200

    KAFKA-19574: Improve producer and consumer config files (#20302)
    
    This is an attempt at improving the client configuration files. We now
    have sections and comments similar to the other properties files.
    
    Reviewers: Kirk True <[email protected]>, Luke Chen <[email protected]>
    
    ---------
    
    Signed-off-by: Federico Valeri <[email protected]>
---
 .../kafka/clients/consumer/ConsumerConfigTest.java |  25 ++++
 .../kafka/clients/producer/ProducerConfigTest.java |  26 ++++
 config/consumer.properties                         | 130 ++++++++++++++++++--
 config/producer.properties                         | 134 +++++++++++++++++----
 4 files changed, 285 insertions(+), 30 deletions(-)

diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
index 2fa5515fb40..0e50d5d4698 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
@@ -30,6 +30,8 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.CsvSource;
 
+import java.io.FileInputStream;
+import java.io.InputStream;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Locale;
@@ -41,6 +43,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 
 public class ConsumerConfigTest {
 
@@ -256,4 +259,26 @@ public class ConsumerConfigTest {
         assertEquals(configName + " cannot be set when " + 
                 ConsumerConfig.GROUP_PROTOCOL_CONFIG + "=" + 
GroupProtocol.CONSUMER.name(), exception.getMessage());
     }
+
+    /**
+     * Validates config/consumer.properties file to avoid getting out of sync 
with ConsumerConfig.
+     */
+    @Test
+    public void testValidateConfigPropertiesFile() {
+        Properties props = new Properties();
+
+        try (InputStream inputStream = new 
FileInputStream(System.getProperty("user.dir") + 
"/../config/consumer.properties")) {
+            props.load(inputStream);
+        } catch (Exception e) {
+            fail("Failed to load config/consumer.properties file: " + 
e.getMessage());
+        }
+
+        ConsumerConfig config = new ConsumerConfig(props);
+
+        for (String key : config.originals().keySet()) {
+            if (!ConsumerConfig.configDef().configKeys().containsKey(key)) {
+                fail("Invalid configuration key: " + key);
+            }
+        }
+    }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
index 207bac6476f..33f069c8523 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
@@ -26,14 +26,18 @@ import 
org.apache.kafka.common.serialization.StringSerializer;
 
 import org.junit.jupiter.api.Test;
 
+import java.io.FileInputStream;
+import java.io.InputStream;
 import java.util.HashMap;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Properties;
 
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 
 public class ProducerConfigTest {
 
@@ -168,4 +172,26 @@ public class ProducerConfigTest {
         configs.put(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG, 
false);
         assertDoesNotThrow(() -> new ProducerConfig(configs));
     }
+
+    /**
+     * Validates config/producer.properties file to avoid getting out of sync 
with ProducerConfig.
+     */
+    @Test
+    public void testValidateConfigPropertiesFile() {
+        Properties props = new Properties();
+
+        try (InputStream inputStream = new 
FileInputStream(System.getProperty("user.dir") + 
"/../config/producer.properties")) {
+            props.load(inputStream);
+        } catch (Exception e) {
+            fail("Failed to load config/producer.properties file: " + 
e.getMessage());
+        }
+
+        ProducerConfig config = new ProducerConfig(props);
+
+        for (String key : config.originals().keySet()) {
+            if (!ProducerConfig.configDef().configKeys().containsKey(key)) {
+                fail("Invalid configuration key: " + key);
+            }
+        }
+    }
 }
diff --git a/config/consumer.properties b/config/consumer.properties
index 01bb12eb089..f65e5299041 100644
--- a/config/consumer.properties
+++ b/config/consumer.properties
@@ -4,23 +4,135 @@
 # The ASF licenses this file to You under the Apache License, Version 2.0
 # (the "License"); you may not use this file except in compliance with
 # the License.  You may obtain a copy of the License at
-# 
+#
 #    http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-# see org.apache.kafka.clients.consumer.ConsumerConfig for more details
 
-# list of brokers used for bootstrapping knowledge about the rest of the 
cluster
-# format: host1:port1,host2:port2 ...
+# See org.apache.kafka.clients.consumer.ConsumerConfig for more details.
+# Consider using environment variables or external configuration management
+# for sensitive information like passwords and environment-specific settings.
+
+##################### Consumer Basics #######################
+
+# List of Kafka brokers used for initial cluster discovery and metadata 
retrieval.
+# Format: host1:port1,host2:port2,host3:port3
+# Include all brokers for high availability
 bootstrap.servers=localhost:9092
 
-# consumer group id
+# Client identifier for logging and metrics.
+# Helps with debugging and monitoring.
+client.id=test-consumer
+
+##################### Transaction Support #####################
+
+# Isolation level for reading messages.
+# Options: read_uncommitted (default), read_committed (for exactly-once 
semantics).
+isolation.level=read_uncommitted
+
+##################### Consumer Group Configuration #####################
+
+# Unique identifier for this consumer group.
+# All consumers with the same group.id will share partition consumption.
 group.id=test-consumer-group
 
-# What to do when there is no initial offset in Kafka or if the current
-# offset does not exist any more on the server: latest, earliest, none
-#auto.offset.reset=
+# What to do when there is no initial offset or if the current offset no 
longer exists.
+# Options: earliest (from beginning), latest (from end), none (throw 
exception).
+# Use 'earliest' to avoid data loss on first run.
+auto.offset.reset=earliest
+
+##################### Partition Assignment Strategy #####################
+
+# Strategy for assigning partitions to consumers in a group.
+# Options: RangeAssignor, RoundRobinAssignor, StickyAssignor, 
CooperativeStickyAssignor.
+# CooperativeStickyAssignor is recommended (requires Kafka 2.4+).
+partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor
+
+##################### Deserialization #####################
+
+# Deserializer class for message keys.
+# Common options: StringDeserializer, ByteArrayDeserializer, AvroDeserializer.
+key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
+
+# Deserializer class for message values.
+value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
+
+##################### Offset Management #####################
+
+# Whether to automatically commit offsets in the background.
+# Set to false for manual offset management and exactly-once processing.
+enable.auto.commit=true
+
+# Frequency (in milliseconds) at which offsets are auto-committed.
+# Lower values provide better fault tolerance but increase broker load.
+auto.commit.interval.ms=5000
+
+##################### Classic Group Session Management #####################
+
+# Timeout for detecting consumer failures when using group management.
+# Must be between group.min.session.timeout.ms and 
group.max.session.timeout.ms (broker config).
+session.timeout.ms=30000
+
+# Expected time between heartbeats when using group management.
+# Should be lower than session.timeout.ms (typically 1/3 of session timeout).
+heartbeat.interval.ms=10000
+
+# Maximum time between successive calls to poll().
+# If exceeded, consumer is considered failed and partition rebalancing occurs.
+max.poll.interval.ms=300000
+
+##################### Retry And Error Handling #####################
+
+# Initial and max time to wait for failed request retries.
+# The retry.backoff.ms is the initial backoff value and will increase 
exponentially
+# for each failed request, up to the retry.backoff.max.ms value.
+retry.backoff.ms=100
+retry.backoff.max.ms=1000
+
+# Total time to wait for a response to a request.
+request.timeout.ms=40000
+
+# Close idle connections after this many milliseconds.
+connections.max.idle.ms=540000
+
+##################### Security Configuration #####################
+
+# Security protocol for communication with brokers.
+# Options: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
+#security.protocol=SASL_SSL
+
+# SSL configuration.
+#ssl.truststore.location=/path/to/truststore.jks
+#ssl.truststore.password=truststore-password
+
+# SASL configuration.
+#sasl.mechanism=PLAIN
+#sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule 
required \
+#    username="your-username" \
+#    password="your-password";
+
+##################### Performance And Throughput #####################
+
+# Minimum data size (bytes) and maximum polling timeout (ms).
+# Whichever condition is met first will trigger the fetch operation.
+# Balances response latency against message batching efficiency.
+# For remote partition fetching, configure remote.fetch.max.wait.ms instead.
+fetch.min.bytes=1
+fetch.max.wait.ms=500
+
+# Set soft limits to the amount of bytes per fetch request and partition.
+# Both max.partition.fetch.bytes and fetch.max.bytes limits can be exceeded 
when
+# the first batch in the first non-empty partition is larger than the 
configured
+# value to ensure that the consumer can make progress.
+# Configuring message.max.bytes (broker config) or max.message.bytes (topic 
config)
+# <= fetch.max.bytes prevents oversized fetch responses.
+fetch.max.bytes=52428800
+max.partition.fetch.bytes=1048576
+
+# Maximum number of records returned in a single poll() call.
+# Higher values increase throughput but may cause longer processing delays.
+max.poll.records=500
diff --git a/config/producer.properties b/config/producer.properties
index 3a999e7c17e..6165ce9ff57 100644
--- a/config/producer.properties
+++ b/config/producer.properties
@@ -12,35 +12,127 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-# see org.apache.kafka.clients.producer.ProducerConfig for more details
 
-############################# Producer Basics #############################
+# See org.apache.kafka.clients.producer.ProducerConfig for more details.
+# Consider using environment variables or external configuration management
+# for sensitive information like passwords and environment-specific settings.
 
-# list of brokers used for bootstrapping knowledge about the rest of the 
cluster
-# format: host1:port1,host2:port2 ...
+##################### Producer Basics #####################
+
+# List of Kafka brokers used for initial cluster discovery and metadata 
retrieval.
+# Format: host1:port1,host2:port2,host3:port3
+# Include all brokers for high availability.
 bootstrap.servers=localhost:9092
 
-# specify the compression codec for all data generated: none, gzip, snappy, 
lz4, zstd
-compression.type=none
+# Client identifier for logging and metrics.
+# Helps with debugging and monitoring.
+client.id=test-producer
+
+##################### Transaction Support #####################
+
+# Transactional ID for the producer.
+# Must be unique across all producer instances.
+# Enables exactly-once semantics across multiple partitions/topics.
+#transactional.id=test-transactional-id
+
+# Maximum amount of time in milliseconds that a transaction will remain open.
+# Only applies when transactional.id is set.
+transaction.timeout.ms=60000
+
+##################### Partitioning #####################
+
+# Name of the partitioner class for partitioning records.
+# Default uses "sticky" partitioning which improves throughput by filling 
batches
+# Options: DefaultPartitioner, RoundRobinPartitioner, UniformStickyPartitioner.
+#partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner
+
+##################### Serialization #####################
+
+# Serializer class for message keys.
+# Common options: StringSerializer, ByteArraySerializer, AvroSerializer.
+key.serializer=org.apache.kafka.common.serialization.StringSerializer
+
+# Serializer class for message values.
+value.serializer=org.apache.kafka.common.serialization.StringSerializer
+
+##################### Reliability And Durability #####################
+
+# Number of acknowledgments the producer requires the leader to have received.
+# Options: 0 (no ack), 1 (leader only), all/-1 (all in-sync replicas).
+# Use 'all' for maximum durability.
+acks=all
 
-# name of the partitioner class for partitioning records;
-# The default uses "sticky" partitioning logic which spreads the load evenly 
between partitions, but improves throughput by attempting to fill the batches 
sent to each partition.
-#partitioner.class=
+# Number of retries for failed sends.
+# Set to high value or Integer.MAX_VALUE for maximum reliability.
+retries=2147483647
 
-# the maximum amount of time the client will wait for the response of a request
-#request.timeout.ms=
+# Initial and max time to wait for failed request retries.
+# The retry.backoff.ms is the initial backoff value and will increase 
exponentially
+# for each failed request, up to the retry.backoff.max.ms value.
+retry.backoff.ms=100
+retry.backoff.max.ms=1000
 
-# how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block 
for
-#max.block.ms=
+# Enable idempotent producer to prevent duplicate messages.
+# Ensures exactly-once delivery semantics when combined with proper consumer 
settings.
+enable.idempotence=true
+
+# Maximum number of unacknowledged requests the client will send on a single 
connection.
+# Must be <= 5 when enable.idempotence=true to maintain ordering guarantees.
+max.in.flight.requests.per.connection=5
+
+##################### Timeouts And Blocking #####################
+
+# Maximum amount of time the client will wait for the response of a request.
+# Should be higher than replica.lag.time.max.ms (broker config).
+request.timeout.ms=30000
+
+# How long KafkaProducer.send() and KafkaProducer.partitionsFor() will block.
+# Should be higher than request.timeout.ms.
+max.block.ms=60000
+
+# Timeout for broker requests, including produce requests.
+# Should be greater than or equal to the sum of request.timeout.ms and 
linger.ms.
+delivery.timeout.ms=120000
+
+##################### Security Configuration #####################
+
+# Security protocol for communication with brokers.
+# Options: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
+#security.protocol=SASL_SSL
+
+# SSL configuration.
+#ssl.truststore.location=/path/to/truststore.jks
+#ssl.truststore.password=truststore-password
+
+# SASL configuration.
+#sasl.mechanism=PLAIN
+#sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule 
required \
+#    username="your-username" \
+#    password="your-password";
+
+##################### Performance And Throughput #####################
+
+# Compression codec for all data generated.
+# Options: none, gzip, snappy, lz4, zstd.
+# Can greatly improve throughput at the cost of increased CPU usage.
+compression.type=none
 
-# the producer will wait for up to the given delay to allow other records to 
be sent so that the sends can be batched together
-#linger.ms=
+# Producer will wait up to this delay to batch records together.
+# Higher values increase throughput but add latency.
+# Set to 0 for lowest latency, 5-100ms for balanced throughput/latency.
+linger.ms=5
 
-# the maximum size of a request in bytes
-#max.request.size=
+# Default batch size in bytes when batching multiple records sent to a 
partition.
+# Larger batches improve throughput but use more memory.
+# 16KB is a good starting point, adjust based on message size and throughput 
needs.
+batch.size=16384
 
-# the default batch size in bytes when batching multiple records sent to a 
partition
-#batch.size=
+# Total bytes of memory the producer can use to buffer records waiting to be 
sent.
+# Should be larger than batch.size * number of partitions you're writing to.
+# 32MB is reasonable for most use cases.
+buffer.memory=33554432
 
-# the total bytes of memory the producer can use to buffer records waiting to 
be sent to the server
-#buffer.memory=
+# Maximum size of a request in bytes.
+# Should accommodate your largest batch size plus overhead.
+# 1MB is default and suitable for most cases.
+max.request.size=1048576

Reply via email to