Repository: kafka Updated Branches: refs/heads/trunk c24740c7b -> bf83131df
KAFKA-1328 follow up: Updated javadoc Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bf83131d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bf83131d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bf83131d Branch: refs/heads/trunk Commit: bf83131dffbdb6c39de0135e0426701ca141f150 Parents: c24740c Author: Neha Narkhede <[email protected]> Authored: Tue May 20 22:11:19 2014 -0700 Committer: Neha Narkhede <[email protected]> Committed: Tue May 20 22:11:19 2014 -0700 ---------------------------------------------------------------------- .../kafka/clients/consumer/KafkaConsumer.java | 48 +++++++++++--------- .../clients/consumer/ConsumerExampleTest.java | 5 +- 2 files changed, 29 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/bf83131d/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 18bcc90..fe93afa 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -57,7 +57,11 @@ import org.slf4j.LoggerFactory; * for(int i = 0;i < recordsPerTopic.size();i++) { * ConsumerRecord record = recordsPerTopic.get(i); * // process record - * processedOffsets.put(record.partition(), record.offset()); + * try { + * processedOffsets.put(record.topicAndpartition(), record.offset()); + * } catch (Exception e) { + * e.printStackTrace(); + * } * } * } * return processedOffsets; @@ -80,7 +84,7 @@ import org.slf4j.LoggerFactory; * consumer.subscribe("foo", "bar"); * boolean isRunning = true; * while(isRunning) { - * Map<String, ConsumerRecords> records = consumer.poll(100, TimeUnit.MILLISECONDS); + * Map<String, ConsumerRecords> records = consumer.poll(100); * process(records); * } * consumer.close(); @@ -88,7 +92,7 @@ import org.slf4j.LoggerFactory; * </pre> * This example demonstrates how the consumer can be used to leverage Kafka's group management functionality for automatic consumer load * balancing and failover. This example assumes that the offsets are stored in Kafka and are manually committed using - * the commit() API. This example also demonstrates rewinding the consumer's offsets if processing of the consumed + * the commit(boolean) API. This example also demonstrates rewinding the consumer's offsets if processing of the consumed * messages fails. Note that this method of rewinding offsets using {@link #seek(Map) seek(offsets)} is only useful for rewinding the offsets * of the current consumer instance. As such, this will not trigger a rebalance or affect the fetch offsets for the other consumer instances. * <pre> @@ -105,14 +109,14 @@ import org.slf4j.LoggerFactory; * boolean isRunning = true; * Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>(); * while(isRunning) { - * Map<String, ConsumerRecords> records = consumer.poll(100, TimeUnit.MILLISECONDS); + * Map<String, ConsumerRecords> records = consumer.poll(100); * try { * Map<TopicPartition, Long> lastConsumedOffsets = process(records); * consumedOffsets.putAll(lastConsumedOffsets); * numRecords += records.size(); * // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance * if(numRecords % commitInterval == 0) - * consumer.commit(); + * consumer.commit(false); * } catch(Exception e) { * try { * // rewind consumer's offsets for failed partitions @@ -155,14 +159,14 @@ import org.slf4j.LoggerFactory; * KafkaConsumer consumer = new KafkaConsumer(props, * new ConsumerRebalanceCallback() { * boolean rewindOffsets = true; // should be retrieved from external application config - * public void onPartitionsAssigned(Consumer consumer, TopicPartition...partitions) { + * public void onPartitionsAssigned(Consumer consumer, Collection<TopicPartition> partitions) { * Map<TopicPartition, Long> latestCommittedOffsets = consumer.committed(partitions); * if(rewindOffsets) * Map<TopicPartition, Long> newOffsets = rewindOffsets(latestCommittedOffsets, 100); * consumer.seek(newOffsets); * } - * public void onPartitionsRevoked(Consumer consumer, TopicPartition...partitions) { - * consumer.commit(); + * public void onPartitionsRevoked(Consumer consumer, Collection<TopicPartition> partitions) { + * consumer.commit(true); * } * // this API rewinds every partition back by numberOfMessagesToRewindBackTo messages * private Map<TopicPartition, Long> rewindOffsets(Map<TopicPartition, Long> currentOffsets, @@ -179,14 +183,15 @@ import org.slf4j.LoggerFactory; * boolean isRunning = true; * Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>(); * while(isRunning) { - * Map<String, ConsumerRecords> records = consumer.poll(100, TimeUnit.MILLISECONDS); + * Map<String, ConsumerRecords> records = consumer.poll(100); * Map<TopicPartition, Long> lastConsumedOffsets = process(records); * consumedOffsets.putAll(lastConsumedOffsets); * numRecords += records.size(); * // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance * if(numRecords % commitInterval == 0) - * consumer.commit(consumedOffsets); + * consumer.commit(consumedOffsets, true); * } + * consumer.commit(true); * consumer.close(); * } * </pre> @@ -208,19 +213,19 @@ import org.slf4j.LoggerFactory; * props.put("enable.auto.commit", "false"); // since enable.auto.commit only applies to Kafka based offset storage * KafkaConsumer consumer = new KafkaConsumer(props, * new ConsumerRebalanceCallback() { - * public void onPartitionsAssigned(Consumer consumer, TopicPartition...partitions) { + * public void onPartitionsAssigned(Consumer consumer, Collection<TopicPartition> partitions) { * Map<TopicPartition, Long> lastCommittedOffsets = getLastCommittedOffsetsFromCustomStore(partitions); * consumer.seek(lastCommittedOffsets); * } - * public void onPartitionsRevoked(Consumer consumer, TopicPartition...partitions) { + * public void onPartitionsRevoked(Consumer consumer, Collection<TopicPartition> partitions) { * Map<TopicPartition, Long> offsets = getLastConsumedOffsets(partitions); * commitOffsetsToCustomStore(offsets); * } * // following APIs should be implemented by the user for custom offset management - * private Map<TopicPartition, Long> getLastCommittedOffsetsFromCustomStore(TopicPartition... partitions) { + * private Map<TopicPartition, Long> getLastCommittedOffsetsFromCustomStore(Collection<TopicPartition> partitions) { * return null; * } - * private Map<TopicPartition, Long> getLastConsumedOffsets(TopicPartition... partitions) { return null; } + * private Map<TopicPartition, Long> getLastConsumedOffsets(Collection<TopicPartition> partitions) { return null; } * private void commitOffsetsToCustomStore(Map<TopicPartition, Long> offsets) {} * }); * consumer.subscribe("foo", "bar"); @@ -229,7 +234,7 @@ import org.slf4j.LoggerFactory; * boolean isRunning = true; * Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>(); * while(isRunning) { - * Map<String, ConsumerRecords> records = consumer.poll(100, TimeUnit.MILLISECONDS); + * Map<String, ConsumerRecords> records = consumer.poll(100); * Map<TopicPartition, Long> lastConsumedOffsets = process(records); * consumedOffsets.putAll(lastConsumedOffsets); * numRecords += records.size(); @@ -237,6 +242,7 @@ import org.slf4j.LoggerFactory; * if(numRecords % commitInterval == 0) * commitOffsetsToCustomStore(consumedOffsets); * } + * consumer.commit(true); * consumer.close(); * } * </pre> @@ -262,15 +268,15 @@ import org.slf4j.LoggerFactory; * partitions[1] = partition1; * consumer.subscribe(partitions); * // find the last committed offsets for partitions 0,1 of topic foo - * Map<TopicPartition, Long> lastCommittedOffsets = consumer.committed(partition0, partition1); + * Map<TopicPartition, Long> lastCommittedOffsets = consumer.committed(Arrays.asList(partitions)); * // seek to the last committed offsets to avoid duplicates * consumer.seek(lastCommittedOffsets); * // find the offsets of the latest available messages to know where to stop consumption - * Map<TopicPartition, Long> latestAvailableOffsets = consumer.offsetsBeforeTime(-2, partition0, partition1); + * Map<TopicPartition, Long> latestAvailableOffsets = consumer.offsetsBeforeTime(-2, Arrays.asList(partitions)); * boolean isRunning = true; * Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>(); * while(isRunning) { - * Map<String, ConsumerRecords> records = consumer.poll(100, TimeUnit.MILLISECONDS); + * Map<String, ConsumerRecords> records = consumer.poll(100); * Map<TopicPartition, Long> lastConsumedOffsets = process(records); * consumedOffsets.putAll(lastConsumedOffsets); * for(TopicPartition partition : partitions) { @@ -280,7 +286,7 @@ import org.slf4j.LoggerFactory; * isRunning = true; * } * } - * consumer.commit(); + * consumer.commit(true); * consumer.close(); * } * </pre> @@ -304,11 +310,11 @@ import org.slf4j.LoggerFactory; * // seek to the last committed offsets to avoid duplicates * consumer.seek(lastCommittedOffsets); * // find the offsets of the latest available messages to know where to stop consumption - * Map<TopicPartition, Long> latestAvailableOffsets = consumer.offsetsBeforeTime(-2, partition0, partition1); + * Map<TopicPartition, Long> latestAvailableOffsets = consumer.offsetsBeforeTime(-2, Arrays.asList(partitions)); * boolean isRunning = true; * Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>(); * while(isRunning) { - * Map<String, ConsumerRecords> records = consumer.poll(100, TimeUnit.MILLISECONDS); + * Map<String, ConsumerRecords> records = consumer.poll(100); * Map<TopicPartition, Long> lastConsumedOffsets = process(records); * consumedOffsets.putAll(lastConsumedOffsets); * // commit offsets for partitions 0,1 for topic foo to custom store http://git-wip-us.apache.org/repos/asf/kafka/blob/bf83131d/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java index 0548fb4..29ad25e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java @@ -53,9 +53,8 @@ public class ConsumerExampleTest { /** * This example demonstrates how to use the consumer to leverage Kafka's group management functionality for automatic consumer load - * balancing and failure detection. This example assumes that the offsets are stored in Kafka and are manually committed using - * either the commit() or commitAsync() APIs. This example also demonstrates rewinding the consumer's offsets if processing of consumed - * messages fails. + * balancing and failure detection. This example assumes that the offsets are stored in Kafka and are manually committed using the + * commit() API. This example also demonstrates rewinding the consumer's offsets if processing of consumed messages fails. */ // @Test // public void testConsumerGroupManagementWithManualOffsetCommit() {
