You can do this in a few ways. The first will work on your current Kafka
version.

* Use
https://github.com/apache/storm/blob/v1.1.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L348
with LATEST to make your spouts start from the latest offset. Once your
topology is deployed, the spout will start consuming from the latest
offset. You will want to let it run for a little bit to ensure that your
spouts commit the new offset to all partitions. Once all spouts have
committed, you can reset your FirstPollOffsetStrategy to whatever you're
using now, and redeploy.

* Upgrade to Kafka 0.11 and use the consumer groups tool as described below
(The following is a copy paste of an earlier post on this list):

Hi.

You should be able to do this with the storm-kafka-client spout and a
sufficiently new Kafka cluster (0.11, see the documentation at
https://cwiki.apache.org/confluence/display/KAFKA/KIP-
122%3A+Add+Reset+Consumer+Group+Offsets+tooling). Kafka ships with the
kafka/bin/kafka-consumer-groups.sh script. You can use this script with the
--reset-offsets option to reset offsets for a consumer group based on a
bunch of conditions (timestamp, offset and others).

Note that I haven't tried to do this, but I'd imagine you'd need to do
something like this:
* Ensure your kafka spout is configured to use the UNCOMMITTED_EARLIEST or
UNCOMMITTED_LATEST as its FirstPollOffsetStrategy.
* Find the consumer group your spout is running as.
* Stop (kill) the topology containing the spout you want to reset. You need
to do this to avoid having the spout commit offsets while you're working
with the kafka-consumer-groups tool. If you reset the offset while the
spout is running, it will ignore the reset.
* Use kafka-consumer-groups.sh to reset the offsets for your consumer
group. Reference the KIP link to see how to do this, or just run
kafka-consumer-groups.sh to get it to print usage.
* Redeploy your topology

2017-08-29 10:05 GMT+02:00 pradeep s <sreekumar.prad...@gmail.com>:

> Hi Stig,
> Please find the details
> Storm version 1.1.0. Using latest kafka spout which stores offsets inside
>  kafka _consumer_offsets topic .
>
> storm-kafka-client dependency used : version 1.1.1
>
> Kafka version is 0.10.1.0
>
> Thanks
>
> Pradeep
>
> On Tue, Aug 29, 2017 at 12:18 AM, Stig Rohde Døssing <s...@apache.org>
> wrote:
>
>> Hi Pradeep,
>>
>> I think it will be easier to give advice if we know the following:
>>
>> Which Storm version are you on?
>> Which spout are you using? Is it storm-kafka or storm-kafka-client, and
>> is it Trident or regular Storm?
>> Which Kafka version are you on?
>>
>> 2017-08-29 <20%2017%2008%2029> 7:33 GMT+02:00 pradeep s <
>> sreekumar.prad...@gmail.com>:
>>
>>> Hi,
>>> I want reset the storm consumer to latest offset for avoiding few
>>> messages .
>>>
>>> I followed below steps
>>>
>>>
>>>    1. Stop storm consumer
>>>    2. Reset the retention period for the topic to 1 ms
>>>    3. Wait for few fins
>>>    4. Reset the retention period back to original value of 5 days
>>>    5. Start storm consumer
>>>
>>> Is this approach ok in moving storm spout to latest offset..
>>> I am seeing that Kafka manager still shows a lag for the topic
>>>
>>>
>>> thanks
>>> pradeep
>>>
>>
>>
>

Reply via email to