Repository: kafka Updated Branches: refs/heads/trunk 5ae97196a -> 82c219149
KAFKA-2478: Fix manual committing example in javadoc Committing before inserting all records into the database might lead to some records being lost. I've changed the example to commit only after all records returned by `poll` are inserted into the database. Author: Dmitry Stratiychuk <[email protected]> Reviewers: Jason Gustafson, Guozhang Wang Closes #210 from shtratos/KAFKA-2478 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/82c21914 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/82c21914 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/82c21914 Branch: refs/heads/trunk Commit: 82c219149027e8d96840af98d32fb1b877ab4ec2 Parents: 5ae9719 Author: Dmitry Stratiychuk <[email protected]> Authored: Tue Jan 26 11:04:58 2016 -0800 Committer: Guozhang Wang <[email protected]> Committed: Tue Jan 26 11:04:58 2016 -0800 ---------------------------------------------------------------------- .../kafka/clients/consumer/KafkaConsumer.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/82c21914/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 2f78139..afe3240 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -145,7 +145,7 @@ import java.util.regex.Pattern; * props.put("session.timeout.ms", "30000"); * props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); * props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - * KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); + * KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); * consumer.subscribe(Arrays.asList("foo", "bar")); * while (true) { * ConsumerRecords<String, String> records = consumer.poll(100); @@ -200,19 +200,19 @@ import java.util.regex.Pattern; * props.put("session.timeout.ms", "30000"); * props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); * props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - * KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); + * KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); * consumer.subscribe(Arrays.asList("foo", "bar")); - * int commitInterval = 200; - * List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>(); + * final int minBatchSize = 200; + * List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); * while (true) { * ConsumerRecords<String, String> records = consumer.poll(100); * for (ConsumerRecord<String, String> record : records) { * buffer.add(record); - * if (buffer.size() >= commitInterval) { - * insertIntoDb(buffer); - * consumer.commitSync(); - * buffer.clear(); - * } + * } + * if (buffer.size() >= minBatchSize) { + * insertIntoDb(buffer); + * consumer.commitSync(); + * buffer.clear(); * } * } * </pre>
