First , there is small "typo" kind of error in:
https://github.com/apache/incubator-storm/blob/master/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
line 217:
 if (lastCompletedOffset != lastCompletedOffset) {
i guess there should be something like  if (_committedTo !=
lastCompletedOffset) {
without that it will never save position information.

Next thing is kafka bolt.
Configuration like: get topic name from global configuration is not
probably best way - I may want to attach to bolts and each send to
different queue. Something like SpouConfig for spout probably would work
better for bolt as well (so you can pass queue name or maybe even different
zk broker list if wanted).
like:
collector.emit("stream1",new Values(v1));
collector.emit("stream2",new Values(v2));
and I'll bind each stream to different kafka bolts to get message sent to
right queue.

Haralds

Reply via email to