Requires stopping your existing consumers, but otherwise should work:

from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata

def reset_offsets(group_id, topic, bootstrap_servers):
  consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers,
group_id=group_id)
  consumer.assign([TopicPartition(topic, i) for i in
consumer.partitions_for_topic(topic)])
  consumer.commit({tp: OffsetAndMetadata(0, b'') for tp in
consumer.assignment()})
  consumer.close()

On May 3, 2017 9:20 AM, "Ben Stopford" <b...@confluent.io> wrote:

> Hu is correct, there isn't anything currently, but there is an active
> proposal:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 122%3A+Add+Reset+Consumer+Group+Offsets+tooling
>
> On Wed, May 3, 2017 at 1:23 PM Hu Xi <huxi...@hotmail.com> wrote:
>
> > Seems there is no command line out of box, but if you could write a
> simple
> > Java client application that firstly calls 'seek' or 'seekToBeginning' to
> > reset offsets to what you expect and then invoke commitSync to commit the
> > offsets.
> >
> >
> > ________________________________
> > 发件人: Paul van der Linden <p...@sportr.co.uk>
> > 发送时间: 2017年5月3日 18:28
> > 收件人: users@kafka.apache.org
> > 主题: Resetting offsets
> >
> > I'm trying to reset the offsets for all partitions for all topics for a
> > consumer group, but I can't seem to find a working way.
> >
> > The command line tool provides a tool to remove a consumer group (which
> > would be fine in this occasion), but this is not working with the "new"
> > style consumer groups. I tried to set consumer offsets with a client,
> which
> > also didn't work (we are using confluent-kafka-python with librdkafka).
> >
> > Is there any way to reset the offsets (preferable with python or a
> command
> > line tool)?
> >
> > Thanks
> >
>

Reply via email to