Hello Alec,

What version of kafka are you using? 0.7.2?
And Storm Version ? 0.8.1?

The consumer you are using is prepared for kafka 0.7.2 and Storm 0.8.1.

If you are using Storm 0.9.1 (or 0.9.2) and Kafka 0.8.0 (or 0.8.1.1) you
can try this example:
https://github.com/mvalleavila/Storm-0.9.1-Kafka-0.8-Test

I need to change some versions in pom.xml, but is working correctly with
current versions.

If you want to use storm-kafka native connector included in 0.9.2 version
you can try this patch to avoid ClassNotFound Errors:

https://github.com/buildoop/buildoop/blob/development/recipes/storm/storm-0.9.2_openbus-0.0.1-r1/rpm/sources/storm-kafka-dependencies.patch

Regards

Marcelo




2014-08-05 23:04 GMT+02:00 Sa Li <sa.in.v...@gmail.com>:

> Hi, all
>
> This is a follow-up message related to my another thread “kafka-spout
> running error” in which I described unable to run kafka consumer. Here I
> run jabbaugh’s consumer (https://github.com/jabbaugh/kafka-storm-consumer)
> , here I set KAFKA_DOMAIN=127.0.0.1, KAFKA_PORT=9092,
> KAFKA_TOPIC=topictest. Then I run
>  storm jar target/storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar
> storm.example.trident.ExampleTopology ExampleTopology
>
> Then I got similar error as my last post, not able to connect to zookeeper
>
> 3593 [Thread-9] INFO  backtype.storm.daemon.worker - Worker
> ca4d20b6-9015-4400-bac3-e219975c310f for storm ExampleTrident-1-1407272043
> on 0e61159b-6701-400a-9d73-d21c84102e37:4 has finished loading
> 3614 [Thread-26-$mastercoord-bg0] INFO
> com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
> 3626 [Thread-26-$mastercoord-bg0] INFO  backtype.storm.daemon.executor -
> Opened spout $mastercoord-bg0:(1)
> 3629 [Thread-26-$mastercoord-bg0] INFO  backtype.storm.daemon.executor -
> Activating spout $mastercoord-bg0:(1)
> 3660 [Thread-22-spout0] ERROR backtype.storm.util - Async loop died!
> java.lang.NoSuchMethodError:
> kafka.javaapi.consumer.SimpleConsumer.<init>(Ljava/lang/String;III)V
> at
> storm.kafka.DynamicPartitionConnections.register(DynamicPartitionConnections.java:33)
> ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
> at
> storm.kafka.DynamicPartitionConnections.register(DynamicPartitionConnections.java:28)
> ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
> at
> storm.kafka.trident.TransactionalTridentKafkaSpout$Emitter.emitPartitionBatchNew(TransactionalTridentKafkaSpout.java:80)
> ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
> at
> storm.kafka.trident.TransactionalTridentKafkaSpout$Emitter.emitPartitionBatchNew(TransactionalTridentKafkaSpout.java:60)
> ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
> at
> storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter$1.init(PartitionedTridentSpoutExecutor.java:108)
> ~[storm-core-0.9.0.1.jar:na]
> at
> storm.trident.topology.state.RotatingTransactionalState.getState(RotatingTransactionalState.java:66)
> ~[storm-core-0.9.0.1.jar:na]
> at
> storm.trident.topology.state.RotatingTransactionalState.getStateOrCreate(RotatingTransactionalState.java:93)
> ~[storm-core-0.9.0.1.jar:na]
> at
> storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter.emitBatch(PartitionedTridentSpoutExecutor.java:104)
> ~[storm-core-0.9.0.1.jar:na]
> at
> storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:65)
> ~[storm-core-0.9.0.1.jar:na]
> at
> storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:352)
> ~[storm-core-0.9.0.1.jar:na]
> at
> backtype.storm.daemon.executor$fn__3498$tuple_action_fn__3500.invoke(executor.clj:615)
> ~[storm-core-0.9.0.1.jar:na]
> at
> backtype.storm.daemon.executor$mk_task_receiver$fn__3421.invoke(executor.clj:383)
> ~[storm-core-0.9.0.1.jar:na]
> at
> backtype.storm.disruptor$clojure_handler$reify__2962.onEvent(disruptor.clj:43)
> ~[storm-core-0.9.0.1.jar:na]
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
> ~[storm-core-0.9.0.1.jar:na]
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61)
> ~[storm-core-0.9.0.1.jar:na]
> at
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
> ~[storm-core-0.9.0.1.jar:na]
> at
> backtype.storm.daemon.executor$fn__3498$fn__3510$fn__3557.invoke(executor.clj:730)
> ~[storm-core-0.9.0.1.jar:na]
> at backtype.storm.util$async_loop$fn__444.invoke(util.clj:403)
> ~[storm-core-0.9.0.1.jar:na]
> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
> at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]
> 3660 [Thread-22-spout0] ERROR backtype.storm.daemon.executor -
> java.lang.NoSuchMethodError:
> kafka.javaapi.consumer.SimpleConsumer.<init>(Ljava/lang/String;III)V
> at
> storm.kafka.DynamicPartitionConnections.register(DynamicPartitionConnections.java:33)
> ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
> at
> storm.kafka.DynamicPartitionConnections.register(DynamicPartitionConnections.java:28)
> ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
> at
> storm.kafka.trident.TransactionalTridentKafkaSpout$Emitter.emitPartitionBatchNew(TransactionalTridentKafkaSpout.java:80)
> ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
> at
> storm.kafka.trident.TransactionalTridentKafkaSpout$Emitter.emitPartitionBatchNew(TransactionalTridentKafkaSpout.java:60)
> ~[storm-oam-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
> at
> storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter$1.init(PartitionedTridentSpoutExecutor.java:108)
> ~[storm-core-0.9.0.1.jar:na]
> at
> storm.trident.topology.state.RotatingTransactionalState.getState(RotatingTransactionalState.java:66)
> ~[storm-core-0.9.0.1.jar:na]
> at
> storm.trident.topology.state.RotatingTransactionalState.getStateOrCreate(RotatingTransactionalState.java:93)
> ~[storm-core-0.9.0.1.jar:na]
> at
> storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter.emitBatch(PartitionedTridentSpoutExecutor.java:104)
> ~[storm-core-0.9.0.1.jar:na]
> at
> storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:65)
> ~[storm-core-0.9.0.1.jar:na]
> at
> storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:352)
> ~[storm-core-0.9.0.1.jar:na]
> at
> backtype.storm.daemon.executor$fn__3498$tuple_action_fn__3500.invoke(executor.clj:615)
> ~[storm-core-0.9.0.1.jar:na]
> at
> backtype.storm.daemon.executor$mk_task_receiver$fn__3421.invoke(executor.clj:383)
> ~[storm-core-0.9.0.1.jar:na]
> at
> backtype.storm.disruptor$clojure_handler$reify__2962.onEvent(disruptor.clj:43)
> ~[storm-core-0.9.0.1.jar:na]
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
> ~[storm-core-0.9.0.1.jar:na]
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61)
> ~[storm-core-0.9.0.1.jar:na]
> at
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
> ~[storm-core-0.9.0.1.jar:na]
> at
> backtype.storm.daemon.executor$fn__3498$fn__3510$fn__3557.invoke(executor.clj:730)
> ~[storm-core-0.9.0.1.jar:na]
> at backtype.storm.util$async_loop$fn__444.invoke(util.clj:403)
> ~[storm-core-0.9.0.1.jar:na]
> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
> at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]
> 3719 [Thread-22-spout0] INFO  backtype.storm.util - Halting process:
> ("Worker died")
> root@DO-mq-dev:/home/stuser/kafkaprj/kafka-storm-consumer#
>
> Can anyone point me what the problem is? thanks
>
> Alec
>

Reply via email to