Hi Robin,

Glad it's working. I'll explain:

When a consumer subscribes to one or many topics using subscribe(), the
consumer group coordinator is responsible for assigning partitions to each
consumer in the consumer group, to ensure all messages in the topic are
being consumed. The coordinator handles failover and expanding/shrinking
the number of consumers.

When you called seek() without calling poll(), the consumer hadn't received
its partition assignments yet, and hence the seek() call didn't behave as
expected. However, once you called poll(), the partition assignments were
made, and seek() behaved as expected.

An alternative to using subscribe() is to use assign(). However, doing so
bypasses the consumer group coordinator, which means you'll lose the nice
failover and expanding/shrinking features and have to handle these cases
yourself (which is cumbersome).

Does that make sense?

Alex

On Thu, Feb 18, 2016 at 1:14 AM, Péricé Robin <perice.ro...@gmail.com>
wrote:

> Hi,
>
> Ok I did a poll() before my seek() and poll() again and now my consumer
> starts at offset.
>
> Thanks you a lot ! But I don't really understand why I have to do that, if
> anyone can explain me.
>
> Regards,
>
> Robin
>
> 2016-02-17 20:39 GMT+01:00 Alex Loddengaard <a...@confluent.io>:
>
> > Hi Robin,
> >
> > I believe seek() needs to be called after the consumer gets its partition
> > assignments. Try calling poll() before you call seek(), then poll() again
> > and process the records from the latter poll().
> >
> > There may be a better way to do this -- let's see if anyone else has a
> > suggestion.
> >
> > Alex
> >
> > On Wed, Feb 17, 2016 at 9:13 AM, Péricé Robin <perice.ro...@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > > I'm trying to use the new Consumer API with this example :
> > >
> > >
> >
> https://github.com/apache/kafka/tree/trunk/examples/src/main/java/kafka/examples
> > >
> > > With a Producer I sent 1000 messages to my Kafka broker. I need to know
> > if
> > > it's possible, for example, to read message from offset 500 to 1000.
> > >
> > > What I did :
> > >
> > >
> > >    -         consumer.seek(new TopicPartition("topic1", 0), 500);
> > >
> > >
> > >    -         final ConsumerRecords<Integer, String> records =
> > >    consumer.poll(1000);
> > >
> > >
> > > But this didn't nothing (when I don't use seek() method I consume all
> the
> > > messages without any problems).
> > >
> > > Any help on this will be greatly appreciated !
> > >
> > > Regards,
> > >
> > > Robin
> > >
> >
> >
> >
> > --
> > *Alex Loddengaard | **Solutions Architect | Confluent*
> > *Download Apache Kafka and Confluent Platform: www.confluent.io/download
> > <http://www.confluent.io/download>*
> >
>



-- 
*Alex Loddengaard | **Solutions Architect | Confluent*
*Download Apache Kafka and Confluent Platform: www.confluent.io/download
<http://www.confluent.io/download>*

Reply via email to