
Okay, migrating a Trident spout is a very different thing. Trident spouts
store their state in Storm's zookeeper (unless you decide otherwise by
setting transactional.zookeeper.servers in storm.yaml). This also applies
to the storm-kafka-client Trident spout, so we won't need to move offsets
into Kafka.

The idea of stopping all the producers and starting at LATEST (or
UNCOMMITTED_LATEST) is decent, but as you note there's a (small) risk of
skipping tuples. In order to get Trident to commit something, you have to
deploy the new topology with LATEST and start the producers again, wait
until at least one commit happens, and then take the topology back down and
redeploy with whatever your first poll strategy normally is. If the worker
crashes before the spout manages to commit something, you will skip tuples.

If you don't want to do that, here's my notes on storm-kafka ->
storm-kafka-client for Trident:

The storage formats and zk paths for the two spouts are a little different.
Both spouts store their state as JSON maps, but some of the keys are
different. I use ${} below to indicate variable substitution.

The root path (in the following: zkRoot) for your spouts data is
/${transactional.zookeeper.root from storm.yaml}/${txId you set with

For the storm-kafka spout the offsets are stored in one of the following
two paths:
${zkRoot}/${topicName}partition_${partition} if you are using wildcard
topic subscriptions
${zkRoot}/partition_${partition} otherwise

The storage format for storm-kafka is as follows:
{ "${topicName}partition_${partition}': {"offset": 0, "nextOffset": 2 } }
if you are using wildcard topic subscriptions
{ "${topicName}partition_${partition}': {"offset": 0, "nextOffset": 2 } }
otherwise (I left out some irrelevant properties)

For storm-kafka-client the zk path is

and the storage format is
{ "${topicName}@${partition}': {"firstOffset": 0, "nextOffset": 2 } }

In order to migrate from storm-kafka to storm-kafka-client, we need to stop
the topology and run a script that moves the offsets from the old
location/format to the new location/format. There's no way to tell Trident
to read from one path/format and write to another, so it has to be done
offline. Once the offsets are migrated, the spout can be replaced in the
topology and the topology can be redeployed.

I might look at writing an application that can do this at some point, but
it might take me a while. If you'd like to look at it yourself, here's some
pointers where to start:
* This is where the offset are written to Zookeeper, assuming you use an
opaque spout
client/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutE You might want to look at this class for a bit
(particularly the emit function), because it's pretty useful for
understanding how/where Trident stores metadata for spouts.
* The return value of
storm-kafka/src/jvm/org/apache/storm/kafka/trident/ defines the format of what's being saved to
Zookeeper for storm-kafka. It's being wrapped in a map so the full written
value is { "${topicName}partition_${partition}': ${theReturnValue} } (see
the storage format note above, it's different if you're not using wildcard
* Similarly for storm-kafka-client the return value of
storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/ defines the format of what that spout
saves to Zookeeper (and expects to find).
* You should use zkCli (it's in your zookeeper/bin directory) to explore
your Zookeeper filesystem. It should be pretty easy to find your offsets in
there with that tool.

Sorry about the wall of text, this turned out to have a lot of detail to

2018-01-10 21:40 GMT+01:00 Nasron Cheong <>:

> Thanks Stig,
> So after some digging, I realized we are really migrating from the kafka
> trident emitter in storm-kafka, to the trident emitter in
> storm-kafka-client.
> As far as I can see, the offset information is still stored in zk, and the
> offset info for storm-kafka is (
> storm/blob/master/external/storm-kafka/src/jvm/org/
> apache/storm/kafka/trident/
> However this seems quite different from storm-kafka-client, which uses
> src/main/java/org/apache/storm/kafka/spout/trident/
> I'm not sure under which zknode this information is stored - and if the
> zknode itself is different between the two implementations.
> Looks like I need a tool to copy the stored values in zk from old
> storm-kafka to storm-kafka-client?
> Another option I suppose is to:
> - stop topic producers
> - run the old code until it drains all topics
> - start new code with FirstPollOffsetStrategy.LATEST
> Although this seems risky.
> Thanks!
> - Nasron
> On Thu, Dec 21, 2017 at 4:23 PM, Stig Rohde Døssing <>
> wrote:
>> Hi Nasron,
>> I don't believe there's currently a tool to help you migrate. We did it
>> manually by writing a small utility that looked up the commit offsets in
>> Storm's Zookeeper, opened a KafkaConsumer with the new consumer group id
>> and committed the offsets for the appropriate partitions. We stopped our
>> topologies, used this utility and redeployed with the new spout.
>> Assuming there isn't already a tool for migration floating around
>> somewhere, I think we could probably build some migration support into the
>> storm-kafka-client spout. If the path to the old offsets in Storm's
>> Zookeeper is given, we might be able to extract them and start up the new
>> spout from there.
>> 2017-12-19 21:59 GMT+01:00 Nasron Cheong <>:
>>> Hi,
>>> I'm trying to determine steps for migration to the storm-kafka-client in
>>> order to use the new kafka client.
>>> It's not quite clear to me how offsets are migrated - is there a
>>> specific set of steps to ensure offsets are moved from the ZK based offsets
>>> into the kafka based offsets?
>>> Or is the original configuration respected, and storm-kafka-client can
>>> mostly be a drop in replacement?
>>> I want to avoid having spouts reset to the beginning of topics after
>>> deployment, due to this change.
>>> Thanks.
>>> - Nasron

Reply via email to