Re: Resetting offsets
kafka-python, yes. On May 4, 2017 2:28 AM, "Paul van der Linden" wrote: Thanks everyone. @Dana is that using the kafka-python library? On Thu, May 4, 2017 at 4:52 AM, Dana Powers wrote: > Requires stopping your existing consumers, but otherwise should work: > > from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata > > def reset_offsets(group_id, topic, bootstrap_servers): > consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers, > group_id=group_id) > consumer.assign([TopicPartition(topic, i) for i in > consumer.partitions_for_topic(topic)]) > consumer.commit({tp: OffsetAndMetadata(0, b'') for tp in > consumer.assignment()}) > consumer.close() > > On May 3, 2017 9:20 AM, "Ben Stopford" wrote: > > > Hu is correct, there isn't anything currently, but there is an active > > proposal: > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > 122%3A+Add+Reset+Consumer+Group+Offsets+tooling > > > > On Wed, May 3, 2017 at 1:23 PM Hu Xi wrote: > > > > > Seems there is no command line out of box, but if you could write a > > simple > > > Java client application that firstly calls 'seek' or 'seekToBeginning' > to > > > reset offsets to what you expect and then invoke commitSync to commit > the > > > offsets. > > > > > > > > > > > > 发件人: Paul van der Linden > > > 发送时间: 2017年5月3日 18:28 > > > 收件人: users@kafka.apache.org > > > 主题: Resetting offsets > > > > > > I'm trying to reset the offsets for all partitions for all topics for a > > > consumer group, but I can't seem to find a working way. > > > > > > The command line tool provides a tool to remove a consumer group (which > > > would be fine in this occasion), but this is not working with the "new" > > > style consumer groups. I tried to set consumer offsets with a client, > > which > > > also didn't work (we are using confluent-kafka-python with librdkafka). > > > > > > Is there any way to reset the offsets (preferable with python or a > > command > > > line tool)? > > > > > > Thanks > > > > > >
Re: Resetting offsets
Thanks everyone. @Dana is that using the kafka-python library? On Thu, May 4, 2017 at 4:52 AM, Dana Powers wrote: > Requires stopping your existing consumers, but otherwise should work: > > from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata > > def reset_offsets(group_id, topic, bootstrap_servers): > consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers, > group_id=group_id) > consumer.assign([TopicPartition(topic, i) for i in > consumer.partitions_for_topic(topic)]) > consumer.commit({tp: OffsetAndMetadata(0, b'') for tp in > consumer.assignment()}) > consumer.close() > > On May 3, 2017 9:20 AM, "Ben Stopford" wrote: > > > Hu is correct, there isn't anything currently, but there is an active > > proposal: > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > 122%3A+Add+Reset+Consumer+Group+Offsets+tooling > > > > On Wed, May 3, 2017 at 1:23 PM Hu Xi wrote: > > > > > Seems there is no command line out of box, but if you could write a > > simple > > > Java client application that firstly calls 'seek' or 'seekToBeginning' > to > > > reset offsets to what you expect and then invoke commitSync to commit > the > > > offsets. > > > > > > > > > > > > 发件人: Paul van der Linden > > > 发送时间: 2017年5月3日 18:28 > > > 收件人: users@kafka.apache.org > > > 主题: Resetting offsets > > > > > > I'm trying to reset the offsets for all partitions for all topics for a > > > consumer group, but I can't seem to find a working way. > > > > > > The command line tool provides a tool to remove a consumer group (which > > > would be fine in this occasion), but this is not working with the "new" > > > style consumer groups. I tried to set consumer offsets with a client, > > which > > > also didn't work (we are using confluent-kafka-python with librdkafka). > > > > > > Is there any way to reset the offsets (preferable with python or a > > command > > > line tool)? > > > > > > Thanks > > > > > >
Re: Resetting offsets
Requires stopping your existing consumers, but otherwise should work: from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata def reset_offsets(group_id, topic, bootstrap_servers): consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers, group_id=group_id) consumer.assign([TopicPartition(topic, i) for i in consumer.partitions_for_topic(topic)]) consumer.commit({tp: OffsetAndMetadata(0, b'') for tp in consumer.assignment()}) consumer.close() On May 3, 2017 9:20 AM, "Ben Stopford" wrote: > Hu is correct, there isn't anything currently, but there is an active > proposal: > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 122%3A+Add+Reset+Consumer+Group+Offsets+tooling > > On Wed, May 3, 2017 at 1:23 PM Hu Xi wrote: > > > Seems there is no command line out of box, but if you could write a > simple > > Java client application that firstly calls 'seek' or 'seekToBeginning' to > > reset offsets to what you expect and then invoke commitSync to commit the > > offsets. > > > > > > ____ > > 发件人: Paul van der Linden > > 发送时间: 2017年5月3日 18:28 > > 收件人: users@kafka.apache.org > > 主题: Resetting offsets > > > > I'm trying to reset the offsets for all partitions for all topics for a > > consumer group, but I can't seem to find a working way. > > > > The command line tool provides a tool to remove a consumer group (which > > would be fine in this occasion), but this is not working with the "new" > > style consumer groups. I tried to set consumer offsets with a client, > which > > also didn't work (we are using confluent-kafka-python with librdkafka). > > > > Is there any way to reset the offsets (preferable with python or a > command > > line tool)? > > > > Thanks > > >
Re: Resetting offsets
Hu is correct, there isn't anything currently, but there is an active proposal: https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+Reset+Consumer+Group+Offsets+tooling On Wed, May 3, 2017 at 1:23 PM Hu Xi wrote: > Seems there is no command line out of box, but if you could write a simple > Java client application that firstly calls 'seek' or 'seekToBeginning' to > reset offsets to what you expect and then invoke commitSync to commit the > offsets. > > > > 发件人: Paul van der Linden > 发送时间: 2017年5月3日 18:28 > 收件人: users@kafka.apache.org > 主题: Resetting offsets > > I'm trying to reset the offsets for all partitions for all topics for a > consumer group, but I can't seem to find a working way. > > The command line tool provides a tool to remove a consumer group (which > would be fine in this occasion), but this is not working with the "new" > style consumer groups. I tried to set consumer offsets with a client, which > also didn't work (we are using confluent-kafka-python with librdkafka). > > Is there any way to reset the offsets (preferable with python or a command > line tool)? > > Thanks >
答复: Resetting offsets
Seems there is no command line out of box, but if you could write a simple Java client application that firstly calls 'seek' or 'seekToBeginning' to reset offsets to what you expect and then invoke commitSync to commit the offsets. 发件人: Paul van der Linden 发送时间: 2017年5月3日 18:28 收件人: users@kafka.apache.org 主题: Resetting offsets I'm trying to reset the offsets for all partitions for all topics for a consumer group, but I can't seem to find a working way. The command line tool provides a tool to remove a consumer group (which would be fine in this occasion), but this is not working with the "new" style consumer groups. I tried to set consumer offsets with a client, which also didn't work (we are using confluent-kafka-python with librdkafka). Is there any way to reset the offsets (preferable with python or a command line tool)? Thanks
Resetting offsets
I'm trying to reset the offsets for all partitions for all topics for a consumer group, but I can't seem to find a working way. The command line tool provides a tool to remove a consumer group (which would be fine in this occasion), but this is not working with the "new" style consumer groups. I tried to set consumer offsets with a client, which also didn't work (we are using confluent-kafka-python with librdkafka). Is there any way to reset the offsets (preferable with python or a command line tool)? Thanks
Re: Resetting Offsets
Reading offsets looks like it's compatible across 0.8.1 and 0.8.2. However, we cannot use the update logic in ImportZkOffsets, since we want to store offsets in the broker in 0.8.2. It looks like SimpleConsumer.commitOffsets() would work with either version. Is there a better way? -Suren On Wednesday, February 18, 2015 11:49 AM, Michal Michalski wrote: See https://cwiki.apache.org/confluence/display/KAFKA/System+Tools and check the following: GetOffsetShell (not very accurate - will set your offsets to much smaller values than you really need; we log offsets frequently in application logs and get it from there) ImportZkOffsets Kind regards, Michał Michalski, michal.michal...@boxever.com On 18 February 2015 at 16:16, Surendranauth Hiraman wrote: > We are using the High Level Consumer API to interact with Kafka. > > However, on restart in the case of failures, we want to be able to manually > reset offsets in certain situations. > > What is the recommended way to do this? > > Should we use the Simple Consumer API just for this restart case? > > Ideally, it would be great to use the same approach in 0.8.1 and in 0.8.2. > > > > SUREN HIRAMAN, VP TECHNOLOGY > Velos > Accelerating Machine Learning > > 54 West 40th Street, 3RD FLOOR > NEW YORK, NY 10018 > T: @suren_h > E: suren.hiraman@v elos.io > W: www.velos.io >
Re: Resetting Offsets
See https://cwiki.apache.org/confluence/display/KAFKA/System+Tools and check the following: GetOffsetShell (not very accurate - will set your offsets to much smaller values than you really need; we log offsets frequently in application logs and get it from there) ImportZkOffsets Kind regards, Michał Michalski, michal.michal...@boxever.com On 18 February 2015 at 16:16, Surendranauth Hiraman wrote: > We are using the High Level Consumer API to interact with Kafka. > > However, on restart in the case of failures, we want to be able to manually > reset offsets in certain situations. > > What is the recommended way to do this? > > Should we use the Simple Consumer API just for this restart case? > > Ideally, it would be great to use the same approach in 0.8.1 and in 0.8.2. > > > > SUREN HIRAMAN, VP TECHNOLOGY > Velos > Accelerating Machine Learning > > 54 West 40th Street, 3RD FLOOR > NEW YORK, NY 10018 > T: @suren_h > E: suren.hiraman@v elos.io > W: www.velos.io >
Resetting Offsets
We are using the High Level Consumer API to interact with Kafka. However, on restart in the case of failures, we want to be able to manually reset offsets in certain situations. What is the recommended way to do this? Should we use the Simple Consumer API just for this restart case? Ideally, it would be great to use the same approach in 0.8.1 and in 0.8.2. SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 54 West 40th Street, 3RD FLOOR NEW YORK, NY 10018 T: @suren_h E: suren.hiraman@v elos.io W: www.velos.io