Hey Bhavesh,

I think your use case can be handled with the new consumer API in roughly
the manner I suggested previously. It might be a little easier if we added
the ability to set the end offset for consumption. Perhaps something like
this:

// stop consumption from the partition when offset is reached
void limit(TopicPartition partition, long offset)

My guess is that we'd have a bit of an uphill battle to get this into the
first release, but it may be possible if the use case is common enough. In
any case, I think consuming to the limit offset and manually pausing the
partition is a viable alternative.

As for your question about fail-over, the new consumer provides a similar
capability to the old high-level consumer. Here is a link to the wiki which
describes its design:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design

-Jason

On Tue, Aug 4, 2015 at 12:01 AM, Bhavesh Mistry <mistry.p.bhav...@gmail.com>
wrote:

> Hi Jason and Kafka Dev Team,
>
>
>
> First of all thanks for responding and I think you got expected behavior
> correctly.
>
>
>
> The use-case is offset range consumption.  We store each minute highest
> offset for each topic per partition.  So if we need to reload or re-consume
> data from yesterday per say 8AM to noon, we would have offset start mapping
> at 8AM and end offset mapping at noon in Time Series Database.
>
>
>
> I was trying to load this use case with New Consumer API.   Do you or Kafka
> Dev team agree with request to either have API that takes in topic and its
> start/end offset for High Level Consumer group  (With older consumer API we
> used Simple consumer before without fail-over).  Also, for each
> range-consumption, there will be different group id  and group id will not
> be reused.  The main purpose is to reload or process past data again (due
> to production bugs or downtime etc occasionally and let main consumer-group
> continue to consume latest records).
>
>
> void subscribe(TopicPartition[] startOffsetPartitions, TopicPartition[]
> endOffsetPartitions)
>
>
>
> or something similar which will allow following:
>
>
>
> 1)   When consumer group already exists (meaning have consumed data and
> committed offset to storage system either Kafka or ZK) ignore start offset
> positions and use committed offset.  If not committed use start Offset
> Partition.
>
> 2)   When partition consumption has reached end Offset for given partition,
> pause is fine or this assigned thread become fail over or wait for
> reassignment.
>
> 3)   When all are Consumer Group is done consuming all partitions offset
> ranges (start to end), gracefully shutdown entire consumer group.
>
> 4)   While consuming records, if one of node or consuming thread goes down
> automatic fail-over to others (Similar to High Level Consumer for OLD
> Consumer API.   I am not sure if there exists High level and/or Simple
> Consumer concept for New API  )
>
>
>
> I hope above explanation clarifies use-case and intended behavior.  Thanks
> for clarifications, and you are correct we need pause(TopicPartition tp),
> resume(TopicPartition tp), and/or API to set to end offset for each
> partition.
>
>
>
> Please do let us know your preference to support above simple use-case.
>
>
> Thanks,
>
>
> Bhavesh
>
> On Thu, Jul 30, 2015 at 1:23 PM, Jason Gustafson <ja...@confluent.io>
> wrote:
>
> > Hi Bhavesh,
> >
> > I'm not totally sure I understand the expected behavior, but I think this
> > can work. Instead of seeking to the start of the range before the poll
> > loop, you should probably provide a ConsumerRebalanceCallback to get
> > notifications when group assignment has changed (e.g. when one of your
> > nodes dies). When a new partition is assigned, the callback will be
> invoked
> > by the consumer and you can use it to check if there's a committed
> position
> > in the range or if you need to seek to the beginning of the range. For
> > example:
> >
> > void onPartitionsAssigned(consumer, partitions) {
> >   for (partition : partitions) {
> >      try {
> >        offset = consumer.committed(partition)
> >        consumer.seek(partition, offset)
> >      } catch (NoOffsetForPartition) {
> >        consumer.seek(partition, rangeStart)
> >      }
> >   }
> > }
> >
> > If a failure occurs, then the partitions will be rebalanced across
> > whichever consumers are still active. The case of the entire cluster
> being
> > rebooted is not really different. When the consumers come back, they
> check
> > the committed position and resume where they left off. Does that make
> > sense?
> >
> > After you are finished consuming a partition's range, you can use
> > KafkaConsumer.pause(partition) to prevent further fetches from being
> > initiated while still maintaining the current assignment. The patch to
> add
> > pause() is not in trunk yet, but it probably will be before too long.
> >
> > One potential problem is that you wouldn't be able to reuse the same
> group
> > to consume a different range because of the way it depends on the
> committed
> > offsets. Kafka's commit API actually allows some additional metadata to
> go
> > along with a committed offset and that could potentially be used to tie
> the
> > commit to the range, but it's not yet exposed in KafkaConsumer. I assume
> it
> > will be eventually, but I'm not sure whether that will be part of the
> > initial release.
> >
> >
> > Hope that helps!
> >
> > Jason
> >
> > On Thu, Jul 30, 2015 at 7:54 AM, Bhavesh Mistry <
> > mistry.p.bhav...@gmail.com>
> > wrote:
> >
> > > Hello Kafka Dev Team,
> > >
> > >
> > > With new Consumer API redesign  (
> > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
> > > ),  is there a capability to consume given the topic and partition
> > start/
> > > end position.  How would I achieve following use case of range
> > consumption
> > > with fail-over.
> > >
> > >
> > > Use Case:
> > > Ability to reload data given topic and its partition offset start/end
> > with
> > > High Level Consumer with fail over.   Basically, High Level Range
> > > consumption and consumer group dies while main consumer group.
> > >
> > >
> > > Suppose you have a topic called “test-topic” and its partition begin
> and
> > > end offset.
> > >
> > > {
> > >
> > > topic:  test-topic,
> > >
> > > [   {      partition id : 1 , offset start:   100,  offset end:
> > > 500,000 },
> > >
> > >
> > > {          partition id : 2 ,  offset start:   200,000, offset end:
> > > 500,000
> > >
> > > ….. for n partitions
> > >
> > > ]
> > >
> > > }
> > >
> > > Each you create consumer group: “Range-Consumer “ and use seek method
> and
> > > for each partition.   Your feedback is greatly appreciated.
> > >
> > >
> > > In each JVM,
> > >
> > >
> > > For each consumption tread:
> > >
> > >
> > > Consumer c = KafkaConsumer( { group.id=”Range-consumer}…)
> > >
> > > Map<Integer, Integer> parttionTOEndOfsetMapping ….
> > >
> > > for(TopicPartition tp : topicPartitionlist){
> > >
> > > seek(TopicPartition(Parition 1), long offset)
> > >
> > > }
> > >
> > >
> > >
> > > while(true){
> > >
> > > ConsumerRecords records = consumer.poll(10000);
> > >
> > > // for each record check the offset
> > >
> > >             record = record.iterator().next();
> > >
> > >             if(parttionTOEndOfsetMapping(record.getPartition()) <=
> > > record.getoffset) {
> > >               // consume  record
> > >
> > >             //commit  offset
> > >
> > >               consumer.commit(CommitType.SYNC);
> > >
> > >             }else {
> > >
> > >                         // Should I unsubscribe it now  for this
> > partition
> > > ?
> > >
> > >                         consumer.unscribe(record.getPartition)
> > >
> > >             }
> > >
> > >
> > >
> > > }
> > >
> > >
> > >
> > >
> > > Please let me know if the above approach is valid:
> > >
> > > 1) how will fail-over work.
> > >
> > > 2) how Rebooting entire consumer group impacts offset seek ? Since
> offset
> > > are stored by Kafka itsself.
> > >
> > > Thanks ,
> > >
> > > Bhavesh
> > >
> >
>

Reply via email to