Basically any IRichSpout is OK...It's just that you get a non transactional
topology
https://github.com/nathanmarz/storm/wiki/Trident-spouts
https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/storm/trident/TridentTopology.java#L113

In the end if I comment out the "context.registerMetric" related code in
the KafkaSpout it works...

However I don't have this problem at all when I am using the transactional
spout.



2014-05-28 4:35 GMT+09:00 Danijel Schiavuzzi <dani...@schiavuzzi.com>:

> I believe you should be using a Trident Kafka spout variant if you're
> building a Trident topology, not the plain Storm KafkaSpout one.
>
>
> On Tuesday, May 27, 2014, Romain Leroux <leroux....@gmail.com> wrote:
>
>> Hi,
>>
>> First of all thanks to @miguno for his amazing work on
>> kafka-storm-starter.
>>
>> I am trying to add a memcached state to it based on :
>> https://github.com/nathanmarz/trident-memcached
>>
>> More particularly I'd like to test the full stack:
>> Kafka->Storm->TransactionalState(Memcached) with Trident.
>> I am firstly focusing on a normal KafkaSpout (as the default settings in
>> kafka-storm-starter) and an nonTransactional state.
>>
>> However I faced the unexpected following Exception during my first tests :
>>
>> ./sbt run
>> ...
>> 9117 [Thread-10] INFO  backtype.storm.daemon.worker - Worker
>> 155f0d72-2e99-495b-9cf5-2523076b3d73 for storm
>> kafka-storm-starter_memcached-1-1401177768 on
>> 3928bc48-49e2-4e88-bb51-eae5642ee963:1025 has finished loading
>> 9118 [Thread-68-__system] INFO  backtype.storm.daemon.executor - Prepared
>> bolt __system:(-1)
>> 9151 [Thread-58-spout0] INFO
>> com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
>> 9171 [Thread-58-spout0] INFO
>> com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
>> 9199 [Thread-58-spout0] INFO  storm.kafka.DynamicBrokersReader - Read
>> partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=
>> 127.0.0.1:9092}}
>> 9200 [Thread-58-spout0] INFO
>> com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
>> 9228 [Thread-58-spout0] ERROR backtype.storm.util - Async loop died!
>> java.lang.RuntimeException: java.lang.RuntimeException:
>> TopologyContext.registerMetric can only be called from within overridden
>> IBolt::prepare() or ISpout::open() method.
>>         at
>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:107)
>> ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating]
>>         at
>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:78)
>> ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating]
>>         at
>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:77)
>> ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating]
>>         at
>> backtype.storm.daemon.executor$eval5170$fn__5171$fn__5183$fn__5230.invoke(executor.clj:745)
>> ~[na:na]
>>         at backtype.storm.util$async_loop$fn__390.invoke(util.clj:433)
>> ~[na:na]
>>         at clojure.lang.AFn.run(AFn.java:24) ~[clojure-1.4.0.jar:na]
>>         at java.lang.Thread.run(Thread.java:724) ~[na:1.7.0_25]
>> Caused by: java.lang.RuntimeException: TopologyContext.registerMetric can
>> only be called from within overridden IBolt::prepare() or ISpout::open()
>> method.
>>         at
>> backtype.storm.task.TopologyContext.registerMetric(TopologyContext.java:230)
>> ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating]
>>         at storm.kafka.KafkaSpout.open(KafkaSpout.java:80)
>> ~[storm-kafka-0.8-plus_2.10-0.5.0-SNAPSHOT.jar:0.5.0-SNAPSHOT]
>>         at
>> storm.trident.spout.RichSpoutBatchExecutor$RichSpoutEmitter.emitBatch(RichSpoutBatchExecutor.java:105)
>> ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating]
>>         at
>> storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82)
>> ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating]
>>         at
>> storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:369)
>> ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating]
>>         at
>> backtype.storm.daemon.executor$eval5170$fn__5171$tuple_action_fn__5173.invoke(executor.clj:630)
>> ~[na:na]
>>         at
>> backtype.storm.daemon.executor$mk_task_receiver$fn__5091.invoke(executor.clj:398)
>> ~[na:na]
>>         at
>> backtype.storm.disruptor$clojure_handler$reify__1894.onEvent(disruptor.clj:58)
>> ~[na:na]
>>         at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor
>>
>> If someone has some pointers on that issue, they are more than welcome.
>>
>> Here is the code :
>>
>> https://github.com/lerouxrgd/kafka-storm-starter/tree/plugin_trient-memcached
>>
>>
>>
>>
>
> --
> Danijel Schiavuzzi
>
> E: dani...@schiavuzzi.com
> W: www.schiavuzzi.com
> T: +385989035562
> Skype: danijels7
>

Reply via email to