Hi Jason and Kafka Dev Team,


First of all thanks for responding and I think you got expected behavior
correctly.



The use-case is offset range consumption.  We store each minute highest
offset for each topic per partition.  So if we need to reload or re-consume
data from yesterday per say 8AM to noon, we would have offset start mapping
at 8AM and end offset mapping at noon in Time Series Database.



I was trying to load this use case with New Consumer API.   Do you or Kafka
Dev team agree with request to either have API that takes in topic and its
start/end offset for High Level Consumer group  (With older consumer API we
used Simple consumer before without fail-over).  Also, for each
range-consumption, there will be different group id  and group id will not
be reused.  The main purpose is to reload or process past data again (due
to production bugs or downtime etc occasionally and let main consumer-group
continue to consume latest records).


void subscribe(TopicPartition[] startOffsetPartitions, TopicPartition[]
endOffsetPartitions)



or something similar which will allow following:



1)   When consumer group already exists (meaning have consumed data and
committed offset to storage system either Kafka or ZK) ignore start offset
positions and use committed offset.  If not committed use start Offset
Partition.

2)   When partition consumption has reached end Offset for given partition,
pause is fine or this assigned thread become fail over or wait for
reassignment.

3)   When all are Consumer Group is done consuming all partitions offset
ranges (start to end), gracefully shutdown entire consumer group.

4)   While consuming records, if one of node or consuming thread goes down
automatic fail-over to others (Similar to High Level Consumer for OLD
Consumer API.   I am not sure if there exists High level and/or Simple
Consumer concept for New API  )



I hope above explanation clarifies use-case and intended behavior.  Thanks
for clarifications, and you are correct we need pause(TopicPartition tp),
resume(TopicPartition tp), and/or API to set to end offset for each
partition.



Please do let us know your preference to support above simple use-case.


Thanks,


Bhavesh

On Thu, Jul 30, 2015 at 1:23 PM, Jason Gustafson <ja...@confluent.io> wrote:

> Hi Bhavesh,
>
> I'm not totally sure I understand the expected behavior, but I think this
> can work. Instead of seeking to the start of the range before the poll
> loop, you should probably provide a ConsumerRebalanceCallback to get
> notifications when group assignment has changed (e.g. when one of your
> nodes dies). When a new partition is assigned, the callback will be invoked
> by the consumer and you can use it to check if there's a committed position
> in the range or if you need to seek to the beginning of the range. For
> example:
>
> void onPartitionsAssigned(consumer, partitions) {
>   for (partition : partitions) {
>      try {
>        offset = consumer.committed(partition)
>        consumer.seek(partition, offset)
>      } catch (NoOffsetForPartition) {
>        consumer.seek(partition, rangeStart)
>      }
>   }
> }
>
> If a failure occurs, then the partitions will be rebalanced across
> whichever consumers are still active. The case of the entire cluster being
> rebooted is not really different. When the consumers come back, they check
> the committed position and resume where they left off. Does that make
> sense?
>
> After you are finished consuming a partition's range, you can use
> KafkaConsumer.pause(partition) to prevent further fetches from being
> initiated while still maintaining the current assignment. The patch to add
> pause() is not in trunk yet, but it probably will be before too long.
>
> One potential problem is that you wouldn't be able to reuse the same group
> to consume a different range because of the way it depends on the committed
> offsets. Kafka's commit API actually allows some additional metadata to go
> along with a committed offset and that could potentially be used to tie the
> commit to the range, but it's not yet exposed in KafkaConsumer. I assume it
> will be eventually, but I'm not sure whether that will be part of the
> initial release.
>
>
> Hope that helps!
>
> Jason
>
> On Thu, Jul 30, 2015 at 7:54 AM, Bhavesh Mistry <
> mistry.p.bhav...@gmail.com>
> wrote:
>
> > Hello Kafka Dev Team,
> >
> >
> > With new Consumer API redesign  (
> >
> >
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
> > ),  is there a capability to consume given the topic and partition
> start/
> > end position.  How would I achieve following use case of range
> consumption
> > with fail-over.
> >
> >
> > Use Case:
> > Ability to reload data given topic and its partition offset start/end
> with
> > High Level Consumer with fail over.   Basically, High Level Range
> > consumption and consumer group dies while main consumer group.
> >
> >
> > Suppose you have a topic called “test-topic” and its partition begin and
> > end offset.
> >
> > {
> >
> > topic:  test-topic,
> >
> > [   {      partition id : 1 , offset start:   100,  offset end:
> > 500,000 },
> >
> >
> > {          partition id : 2 ,  offset start:   200,000, offset end:
> > 500,000
> >
> > ….. for n partitions
> >
> > ]
> >
> > }
> >
> > Each you create consumer group: “Range-Consumer “ and use seek method and
> > for each partition.   Your feedback is greatly appreciated.
> >
> >
> > In each JVM,
> >
> >
> > For each consumption tread:
> >
> >
> > Consumer c = KafkaConsumer( { group.id=”Range-consumer}…)
> >
> > Map<Integer, Integer> parttionTOEndOfsetMapping ….
> >
> > for(TopicPartition tp : topicPartitionlist){
> >
> > seek(TopicPartition(Parition 1), long offset)
> >
> > }
> >
> >
> >
> > while(true){
> >
> > ConsumerRecords records = consumer.poll(10000);
> >
> > // for each record check the offset
> >
> >             record = record.iterator().next();
> >
> >             if(parttionTOEndOfsetMapping(record.getPartition()) <=
> > record.getoffset) {
> >               // consume  record
> >
> >             //commit  offset
> >
> >               consumer.commit(CommitType.SYNC);
> >
> >             }else {
> >
> >                         // Should I unsubscribe it now  for this
> partition
> > ?
> >
> >                         consumer.unscribe(record.getPartition)
> >
> >             }
> >
> >
> >
> > }
> >
> >
> >
> >
> > Please let me know if the above approach is valid:
> >
> > 1) how will fail-over work.
> >
> > 2) how Rebooting entire consumer group impacts offset seek ? Since offset
> > are stored by Kafka itsself.
> >
> > Thanks ,
> >
> > Bhavesh
> >
>

Reply via email to