Hi Bhavesh,
I'm not totally sure I understand the expected behavior, but I think this
can work. Instead of seeking to the start of the range before the poll
loop, you should probably provide a ConsumerRebalanceCallback to get
notifications when group assignment has changed (e.g. when one of your
nodes dies). When a new partition is assigned, the callback will be invoked
by the consumer and you can use it to check if there's a committed position
in the range or if you need to seek to the beginning of the range. For
example:
void onPartitionsAssigned(consumer, partitions) {
for (partition : partitions) {
try {
offset = consumer.committed(partition)
consumer.seek(partition, offset)
} catch (NoOffsetForPartition) {
consumer.seek(partition, rangeStart)
}
}
}
If a failure occurs, then the partitions will be rebalanced across
whichever consumers are still active. The case of the entire cluster being
rebooted is not really different. When the consumers come back, they check
the committed position and resume where they left off. Does that make sense?
After you are finished consuming a partition's range, you can use
KafkaConsumer.pause(partition) to prevent further fetches from being
initiated while still maintaining the current assignment. The patch to add
pause() is not in trunk yet, but it probably will be before too long.
One potential problem is that you wouldn't be able to reuse the same group
to consume a different range because of the way it depends on the committed
offsets. Kafka's commit API actually allows some additional metadata to go
along with a committed offset and that could potentially be used to tie the
commit to the range, but it's not yet exposed in KafkaConsumer. I assume it
will be eventually, but I'm not sure whether that will be part of the
initial release.
Hope that helps!
Jason
On Thu, Jul 30, 2015 at 7:54 AM, Bhavesh Mistry <[email protected]>
wrote:
> Hello Kafka Dev Team,
>
>
> With new Consumer API redesign (
>
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
> ), is there a capability to consume given the topic and partition start/
> end position. How would I achieve following use case of range consumption
> with fail-over.
>
>
> Use Case:
> Ability to reload data given topic and its partition offset start/end with
> High Level Consumer with fail over. Basically, High Level Range
> consumption and consumer group dies while main consumer group.
>
>
> Suppose you have a topic called “test-topic” and its partition begin and
> end offset.
>
> {
>
> topic: test-topic,
>
> [ { partition id : 1 , offset start: 100, offset end:
> 500,000 },
>
>
> { partition id : 2 , offset start: 200,000, offset end:
> 500,000
>
> ….. for n partitions
>
> ]
>
> }
>
> Each you create consumer group: “Range-Consumer “ and use seek method and
> for each partition. Your feedback is greatly appreciated.
>
>
> In each JVM,
>
>
> For each consumption tread:
>
>
> Consumer c = KafkaConsumer( { group.id=”Range-consumer}…)
>
> Map<Integer, Integer> parttionTOEndOfsetMapping ….
>
> for(TopicPartition tp : topicPartitionlist){
>
> seek(TopicPartition(Parition 1), long offset)
>
> }
>
>
>
> while(true){
>
> ConsumerRecords records = consumer.poll(10000);
>
> // for each record check the offset
>
> record = record.iterator().next();
>
> if(parttionTOEndOfsetMapping(record.getPartition()) <=
> record.getoffset) {
> // consume record
>
> //commit offset
>
> consumer.commit(CommitType.SYNC);
>
> }else {
>
> // Should I unsubscribe it now for this partition
> ?
>
> consumer.unscribe(record.getPartition)
>
> }
>
>
>
> }
>
>
>
>
> Please let me know if the above approach is valid:
>
> 1) how will fail-over work.
>
> 2) how Rebooting entire consumer group impacts offset seek ? Since offset
> are stored by Kafka itsself.
>
> Thanks ,
>
> Bhavesh
>