In case you still need a tool for this, I added one here https://github.com/apache/storm/pull/2560.
2018-01-12 20:22 GMT+01:00 Stig Rohde Døssing <s...@apache.org>: > There's a small discrepancy, in the old spout we track the offset the next > batch should start at, while in storm-kafka-client we track the last > emitted offset. So for example if offset 0, 1, 2 were emitted, the old > spout would store offset = 0, nextOffset = 3. The new spout would store > firstOffset = 0, lastOffset = 2. I think it should be lastOffset = > nextOffset - 1. Other than that I agree with your mapping. > > 2018-01-12 <20%2018%2001%2012> 3:43 GMT+01:00 Nasron Cheong < > nas...@gmail.com>: > >> Hi Stig, >> >> That's great! Thanks for all the info. Looking through the code, one >> small detail is the difference between storm-kafka-client's format and >> storm-kafka. The former uses 'firstOffset' and 'lastOffset' and the latter >> uses 'offset' and 'nextOffset'. >> >> So, can I map with >> >> firstOffset = offset >> >> and >> >> lastOffset = nextOffset+1 ? >> >> Looking through the code it seems to be that nextOffset is placed after >> the last consumed message, but I'm not sure. >> >> Thanks >> >> - Nasron >> >> On Thu, Jan 11, 2018 at 5:43 PM, Stig Rohde Døssing <s...@apache.org> >> wrote: >> >>> Nasron, >>> >>> Okay, migrating a Trident spout is a very different thing. Trident >>> spouts store their state in Storm's zookeeper (unless you decide otherwise >>> by setting transactional.zookeeper.servers in storm.yaml). This also >>> applies to the storm-kafka-client Trident spout, so we won't need to move >>> offsets into Kafka. >>> >>> The idea of stopping all the producers and starting at LATEST (or >>> UNCOMMITTED_LATEST) is decent, but as you note there's a (small) risk of >>> skipping tuples. In order to get Trident to commit something, you have to >>> deploy the new topology with LATEST and start the producers again, wait >>> until at least one commit happens, and then take the topology back down and >>> redeploy with whatever your first poll strategy normally is. If the worker >>> crashes before the spout manages to commit something, you will skip tuples. >>> >>> If you don't want to do that, here's my notes on storm-kafka -> >>> storm-kafka-client for Trident: >>> >>> The storage formats and zk paths for the two spouts are a little >>> different. Both spouts store their state as JSON maps, but some of the keys >>> are different. I use ${} below to indicate variable substitution. >>> >>> The root path (in the following: zkRoot) for your spouts data is >>> /${transactional.zookeeper.root from storm.yaml}/${txId you set with >>> TopologyBuilder.newStream}/user. >>> >>> For the storm-kafka spout the offsets are stored in one of the following >>> two paths: >>> ${zkRoot}/${topicName}partition_${partition} if you are using wildcard >>> topic subscriptions >>> ${zkRoot}/partition_${partition} otherwise >>> >>> The storage format for storm-kafka is as follows: >>> { "${topicName}partition_${partition}': {"offset": 0, "nextOffset": 2 } >>> } if you are using wildcard topic subscriptions >>> { "${topicName}partition_${partition}': {"offset": 0, "nextOffset": 2 } >>> } otherwise (I left out some irrelevant properties) >>> >>> For storm-kafka-client the zk path is >>> ${zkRoot}/${topicName}@${partition} >>> >>> and the storage format is >>> { "${topicName}@${partition}': {"firstOffset": 0, "nextOffset": 2 } } >>> >>> In order to migrate from storm-kafka to storm-kafka-client, we need to >>> stop the topology and run a script that moves the offsets from the old >>> location/format to the new location/format. There's no way to tell Trident >>> to read from one path/format and write to another, so it has to be done >>> offline. Once the offsets are migrated, the spout can be replaced in the >>> topology and the topology can be redeployed. >>> >>> I might look at writing an application that can do this at some point, >>> but it might take me a while. If you'd like to look at it yourself, here's >>> some pointers where to start: >>> * This is where the offset are written to Zookeeper, assuming you use an >>> opaque spout https://github.com/apache/stor >>> m/blob/master/storm-client/src/jvm/org/apache/storm/trident/ >>> spout/OpaquePartitionedTridentSpoutExecutor.java#L184. You might want >>> to look at this class for a bit (particularly the emit function), because >>> it's pretty useful for understanding how/where Trident stores metadata for >>> spouts. >>> * The return value of https://github.com/apache/stor >>> m/blob/master/external/storm-kafka/src/jvm/org/apache/storm/ >>> kafka/trident/TridentKafkaEmitter.java#L85 defines the format of what's >>> being saved to Zookeeper for storm-kafka. It's being wrapped in a map so >>> the full written value is { "${topicName}partition_${partition}': >>> ${theReturnValue} } (see the storage format note above, it's different if >>> you're not using wildcard subscriptions) >>> * Similarly for storm-kafka-client the return value of >>> https://github.com/apache/storm/blob/master/external/storm-k >>> afka-client/src/main/java/org/apache/storm/kafka/spout/tride >>> nt/KafkaTridentSpoutEmitter.java#L106 defines the format of what that >>> spout saves to Zookeeper (and expects to find). >>> * You should use zkCli (it's in your zookeeper/bin directory) to explore >>> your Zookeeper filesystem. It should be pretty easy to find your offsets in >>> there with that tool. >>> >>> Sorry about the wall of text, this turned out to have a lot of detail to >>> cover. >>> >>> 2018-01-10 21:40 GMT+01:00 Nasron Cheong <nas...@gmail.com>: >>> >>>> Thanks Stig, >>>> >>>> So after some digging, I realized we are really migrating from the >>>> kafka trident emitter in storm-kafka, to the trident emitter in >>>> storm-kafka-client. >>>> >>>> As far as I can see, the offset information is still stored in zk, and >>>> the offset info for storm-kafka is (https://github.com/apache/sto >>>> rm/blob/master/external/storm-kafka/src/jvm/org/apache/storm >>>> /kafka/trident/TridentKafkaEmitter.java#L140) >>>> >>>> However this seems quite different from storm-kafka-client, which uses >>>> https://github.com/apache/storm/blob/master/external/st >>>> orm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ >>>> trident/KafkaTridentSpoutBatchMetadata.java#L56 >>>> >>>> I'm not sure under which zknode this information is stored - and if the >>>> zknode itself is different between the two implementations. >>>> >>>> Looks like I need a tool to copy the stored values in zk from old >>>> storm-kafka to storm-kafka-client? >>>> >>>> Another option I suppose is to: >>>> - stop topic producers >>>> - run the old code until it drains all topics >>>> - start new code with FirstPollOffsetStrategy.LATEST >>>> >>>> Although this seems risky. >>>> >>>> Thanks! >>>> >>>> - Nasron >>>> >>>> >>>> On Thu, Dec 21, 2017 at 4:23 PM, Stig Rohde Døssing <s...@apache.org> >>>> wrote: >>>> >>>>> Hi Nasron, >>>>> >>>>> I don't believe there's currently a tool to help you migrate. We did >>>>> it manually by writing a small utility that looked up the commit offsets >>>>> in >>>>> Storm's Zookeeper, opened a KafkaConsumer with the new consumer group id >>>>> and committed the offsets for the appropriate partitions. We stopped our >>>>> topologies, used this utility and redeployed with the new spout. >>>>> >>>>> Assuming there isn't already a tool for migration floating around >>>>> somewhere, I think we could probably build some migration support into the >>>>> storm-kafka-client spout. If the path to the old offsets in Storm's >>>>> Zookeeper is given, we might be able to extract them and start up the new >>>>> spout from there. >>>>> >>>>> 2017-12-19 21:59 GMT+01:00 Nasron Cheong <nas...@gmail.com>: >>>>> >>>>>> Hi, >>>>>> >>>>>> I'm trying to determine steps for migration to the storm-kafka-client >>>>>> in order to use the new kafka client. >>>>>> >>>>>> It's not quite clear to me how offsets are migrated - is there a >>>>>> specific set of steps to ensure offsets are moved from the ZK based >>>>>> offsets >>>>>> into the kafka based offsets? >>>>>> >>>>>> Or is the original configuration respected, and storm-kafka-client >>>>>> can mostly be a drop in replacement? >>>>>> >>>>>> I want to avoid having spouts reset to the beginning of topics after >>>>>> deployment, due to this change. >>>>>> >>>>>> Thanks. >>>>>> >>>>>> - Nasron >>>>>> >>>>> >>>>> >>>> >>> >> >