In case you still need a tool for this, I added one here
https://github.com/apache/storm/pull/2560.

2018-01-12 20:22 GMT+01:00 Stig Rohde Døssing <s...@apache.org>:

> There's a small discrepancy, in the old spout we track the offset the next
> batch should start at, while in storm-kafka-client we track the last
> emitted offset. So for example if offset 0, 1, 2 were emitted, the old
> spout would store offset = 0, nextOffset = 3. The new spout would store
> firstOffset = 0, lastOffset = 2. I think it should be lastOffset =
> nextOffset - 1. Other than that I agree with your mapping.
>
> 2018-01-12 <20%2018%2001%2012> 3:43 GMT+01:00 Nasron Cheong <
> nas...@gmail.com>:
>
>> Hi Stig,
>>
>> That's great! Thanks for all the info. Looking through the code, one
>> small detail is the difference between storm-kafka-client's format and
>> storm-kafka. The former uses 'firstOffset' and 'lastOffset' and the latter
>> uses 'offset' and 'nextOffset'.
>>
>> So, can I map with
>>
>> firstOffset = offset
>>
>> and
>>
>> lastOffset = nextOffset+1 ?
>>
>> Looking through the code it seems to be that nextOffset is placed after
>> the last consumed message, but I'm not sure.
>>
>> Thanks
>>
>> - Nasron
>>
>> On Thu, Jan 11, 2018 at 5:43 PM, Stig Rohde Døssing <s...@apache.org>
>> wrote:
>>
>>> Nasron,
>>>
>>> Okay, migrating a Trident spout is a very different thing. Trident
>>> spouts store their state in Storm's zookeeper (unless you decide otherwise
>>> by setting transactional.zookeeper.servers in storm.yaml). This also
>>> applies to the storm-kafka-client Trident spout, so we won't need to move
>>> offsets into Kafka.
>>>
>>> The idea of stopping all the producers and starting at LATEST (or
>>> UNCOMMITTED_LATEST) is decent, but as you note there's a (small) risk of
>>> skipping tuples. In order to get Trident to commit something, you have to
>>> deploy the new topology with LATEST and start the producers again, wait
>>> until at least one commit happens, and then take the topology back down and
>>> redeploy with whatever your first poll strategy normally is. If the worker
>>> crashes before the spout manages to commit something, you will skip tuples.
>>>
>>> If you don't want to do that, here's my notes on storm-kafka ->
>>> storm-kafka-client for Trident:
>>>
>>> The storage formats and zk paths for the two spouts are a little
>>> different. Both spouts store their state as JSON maps, but some of the keys
>>> are different. I use ${} below to indicate variable substitution.
>>>
>>> The root path (in the following: zkRoot) for your spouts data is
>>> /${transactional.zookeeper.root from storm.yaml}/${txId you set with
>>> TopologyBuilder.newStream}/user.
>>>
>>> For the storm-kafka spout the offsets are stored in one of the following
>>> two paths:
>>> ${zkRoot}/${topicName}partition_${partition} if you are using wildcard
>>> topic subscriptions
>>> ${zkRoot}/partition_${partition} otherwise
>>>
>>> The storage format for storm-kafka is as follows:
>>> { "${topicName}partition_${partition}': {"offset": 0, "nextOffset": 2 }
>>> } if you are using wildcard topic subscriptions
>>> { "${topicName}partition_${partition}': {"offset": 0, "nextOffset": 2 }
>>> } otherwise (I left out some irrelevant properties)
>>>
>>> For storm-kafka-client the zk path is
>>> ${zkRoot}/${topicName}@${partition}
>>>
>>> and the storage format is
>>> { "${topicName}@${partition}': {"firstOffset": 0, "nextOffset": 2 } }
>>>
>>> In order to migrate from storm-kafka to storm-kafka-client, we need to
>>> stop the topology and run a script that moves the offsets from the old
>>> location/format to the new location/format. There's no way to tell Trident
>>> to read from one path/format and write to another, so it has to be done
>>> offline. Once the offsets are migrated, the spout can be replaced in the
>>> topology and the topology can be redeployed.
>>>
>>> I might look at writing an application that can do this at some point,
>>> but it might take me a while. If you'd like to look at it yourself, here's
>>> some pointers where to start:
>>> * This is where the offset are written to Zookeeper, assuming you use an
>>> opaque spout https://github.com/apache/stor
>>> m/blob/master/storm-client/src/jvm/org/apache/storm/trident/
>>> spout/OpaquePartitionedTridentSpoutExecutor.java#L184. You might want
>>> to look at this class for a bit (particularly the emit function), because
>>> it's pretty useful for understanding how/where Trident stores metadata for
>>> spouts.
>>> * The return value of https://github.com/apache/stor
>>> m/blob/master/external/storm-kafka/src/jvm/org/apache/storm/
>>> kafka/trident/TridentKafkaEmitter.java#L85 defines the format of what's
>>> being saved to Zookeeper for storm-kafka. It's being wrapped in a map so
>>> the full written value is { "${topicName}partition_${partition}':
>>> ${theReturnValue} } (see the storage format note above, it's different if
>>> you're not using wildcard subscriptions)
>>> * Similarly for storm-kafka-client the return value of
>>> https://github.com/apache/storm/blob/master/external/storm-k
>>> afka-client/src/main/java/org/apache/storm/kafka/spout/tride
>>> nt/KafkaTridentSpoutEmitter.java#L106 defines the format of what that
>>> spout saves to Zookeeper (and expects to find).
>>> * You should use zkCli (it's in your zookeeper/bin directory) to explore
>>> your Zookeeper filesystem. It should be pretty easy to find your offsets in
>>> there with that tool.
>>>
>>> Sorry about the wall of text, this turned out to have a lot of detail to
>>> cover.
>>>
>>> 2018-01-10 21:40 GMT+01:00 Nasron Cheong <nas...@gmail.com>:
>>>
>>>> Thanks Stig,
>>>>
>>>> So after some digging, I realized we are really migrating from the
>>>> kafka trident emitter in storm-kafka, to the trident emitter in
>>>> storm-kafka-client.
>>>>
>>>> As far as I can see, the offset information is still stored in zk, and
>>>> the offset info for storm-kafka is (https://github.com/apache/sto
>>>> rm/blob/master/external/storm-kafka/src/jvm/org/apache/storm
>>>> /kafka/trident/TridentKafkaEmitter.java#L140)
>>>>
>>>> However this seems quite different from storm-kafka-client, which uses
>>>> https://github.com/apache/storm/blob/master/external/st
>>>> orm-kafka-client/src/main/java/org/apache/storm/kafka/spout/
>>>> trident/KafkaTridentSpoutBatchMetadata.java#L56
>>>>
>>>> I'm not sure under which zknode this information is stored - and if the
>>>> zknode itself is different between the two implementations.
>>>>
>>>> Looks like I need a tool to copy the stored values in zk from old
>>>> storm-kafka to storm-kafka-client?
>>>>
>>>> Another option I suppose is to:
>>>> - stop topic producers
>>>> - run the old code until it drains all topics
>>>> - start new code with FirstPollOffsetStrategy.LATEST
>>>>
>>>> Although this seems risky.
>>>>
>>>> Thanks!
>>>>
>>>> - Nasron
>>>>
>>>>
>>>> On Thu, Dec 21, 2017 at 4:23 PM, Stig Rohde Døssing <s...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Nasron,
>>>>>
>>>>> I don't believe there's currently a tool to help you migrate. We did
>>>>> it manually by writing a small utility that looked up the commit offsets 
>>>>> in
>>>>> Storm's Zookeeper, opened a KafkaConsumer with the new consumer group id
>>>>> and committed the offsets for the appropriate partitions. We stopped our
>>>>> topologies, used this utility and redeployed with the new spout.
>>>>>
>>>>> Assuming there isn't already a tool for migration floating around
>>>>> somewhere, I think we could probably build some migration support into the
>>>>> storm-kafka-client spout. If the path to the old offsets in Storm's
>>>>> Zookeeper is given, we might be able to extract them and start up the new
>>>>> spout from there.
>>>>>
>>>>> 2017-12-19 21:59 GMT+01:00 Nasron Cheong <nas...@gmail.com>:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I'm trying to determine steps for migration to the storm-kafka-client
>>>>>> in order to use the new kafka client.
>>>>>>
>>>>>> It's not quite clear to me how offsets are migrated - is there a
>>>>>> specific set of steps to ensure offsets are moved from the ZK based 
>>>>>> offsets
>>>>>> into the kafka based offsets?
>>>>>>
>>>>>> Or is the original configuration respected, and storm-kafka-client
>>>>>> can mostly be a drop in replacement?
>>>>>>
>>>>>> I want to avoid having spouts reset to the beginning of topics after
>>>>>> deployment, due to this change.
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>> - Nasron
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to