You are only sleeping for 100 milliseconds before shutting down the local cluster, which is probably not long enough for the topology to come up and start processing messages. Try increasing the sleep time to something like 10 seconds.
You can also reduce startup time with the following JVM flag: -Djava.net.preferIPv4Stack=true - Taylor On Aug 5, 2014, at 1:16 PM, Sa Li <sa.in.v...@gmail.com> wrote: > Sorry, the stormTopology: > >> TridentTopology topology = new TridentTopology(); >> BrokerHosts zk = new ZkHosts("localhost"); >> TridentKafkaConfig spoutConf = new >> TridentKafkaConfig(zk, “topictest"); >> spoutConf.scheme = new SchemeAsMultiScheme(new >> StringScheme()); >> OpaqueTridentKafkaSpout spout = new >> OpaqueTridentKafkaSpout(spoutConf); > > > > > On Aug 5, 2014, at 9:56 AM, Sa Li <sa.in.v...@gmail.com> wrote: > >> Thank you very much, Marcelo, it indeed worked, now I can run my code >> without getting error. However, another thing is keeping bother me, >> following is my code: >> >> public static class PrintStream implements Filter { >> >> @SuppressWarnings("rawtypes”) >> @Override >> public void prepare(Map conf, TridentOperationContext >> context) { >> } >> @Override >> public void cleanup() { >> } >> @Override >> public boolean isKeep(TridentTuple tuple) { >> System.out.println(tuple); >> return true; >> } >> } >> public static StormTopology buildTopology(LocalDRPC drpc) throws IOException >> { >> >> TridentTopology topology = new TridentTopology(); >> BrokerHosts zk = new ZkHosts("localhost"); >> TridentKafkaConfig spoutConf = new >> TridentKafkaConfig(zk, "ingest_test"); >> spoutConf.scheme = new SchemeAsMultiScheme(new >> StringScheme()); >> OpaqueTridentKafkaSpout spout = new >> OpaqueTridentKafkaSpout(spoutConf); >> >> topology.newStream("kafka", spout) >> .each(new Fields("str"), >> new PrintStream() >> >> ); >> >> return topology.build(); >> } >> public static void main(String[] args) throws Exception { >> >> Config conf = new Config(); >> conf.setDebug(true); >> conf.setMaxSpoutPending(1); >> conf.setMaxTaskParallelism(3); >> LocalDRPC drpc = new LocalDRPC(); >> LocalCluster cluster = new LocalCluster(); >> cluster.submitTopology("kafka", conf, buildTopology(drpc)); >> >> Thread.sleep(100); >> cluster.shutdown(); >> } >> >> What I expect is quite simple, print out the message I collect from a kafka >> producer playback process which is running separately. The topic is listed >> as: >> >> root@DO-mq-dev:/etc/kafka# bin/kafka-list-topic.sh --zookeeper localhost:2181 >> topic: topictest partition: 0 leader: 1 replicas: 1,3,2 isr: >> 1,3,2 >> topic: topictest partition: 1 leader: 2 replicas: 2,1,3 isr: >> 2,1,3 >> topic: topictest partition: 2 leader: 3 replicas: 3,2,1 isr: >> 3,2,1 >> topic: topictest partition: 3 leader: 1 replicas: 1,2,3 isr: >> 1,2,3 >> topic: topictest partition: 4 leader: 2 replicas: 2,3,1 isr: >> 2,3,1 >> >> When I am running the code, this is what I saw on the screen, seems no >> error, but no message print out as well: >> >> SLF4J: Class path contains multiple SLF4J bindings. >> SLF4J: Found binding in >> [jar:file:/etc/storm-0.9.0.1/lib/logback-classic-1.0.6.jar!/org/slf4j/impl/StaticLoggerBinder.class] >> SLF4J: Found binding in >> [jar:file:/home/stuser/kafkaprj/kafka-storm-bitmap/target/kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class] >> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an >> explanation. >> Running: java -client -Dstorm.options= -Dstorm.home=/etc/storm-0.9.0.1 >> -Djava.library.path=/usr/lib/jvm/java-7-openjdk-amd64 -Dstorm.conf.file= -cp >> /etc/storm-0.9.0.1/storm-netty-0.9.0.1.jar:/etc/storm-0.9.0.1/storm-core-0.9.0.1.jar:/etc/storm-0.9.0.1/storm-console-logging-0.9.0.1.jar:/etc/storm-0.9.0.1/lib/log4j-over-slf4j-1.6.6.jar:/etc/storm-0.9.0.1/lib/commons-io-1.4.jar:/etc/storm-0.9.0.1/lib/joda-time-2.0.jar:/etc/storm-0.9.0.1/lib/tools.nrepl-0.2.3.jar:/etc/storm-0.9.0.1/lib/servlet-api-2.5.jar:/etc/storm-0.9.0.1/lib/curator-framework-1.0.1.jar:/etc/storm-0.9.0.1/lib/core.incubator-0.1.0.jar:/etc/storm-0.9.0.1/lib/jetty-6.1.26.jar:/etc/storm-0.9.0.1/lib/commons-codec-1.4.jar:/etc/storm-0.9.0.1/lib/servlet-api-2.5-20081211.jar:/etc/storm-0.9.0.1/lib/httpclient-4.1.1.jar:/etc/storm-0.9.0.1/lib/commons-exec-1.1.jar:/etc/storm-0.9.0.1/lib/logback-classic-1.0.6.jar:/etc/storm-0.9.0.1/lib/libthrift7-0.7.0-2.jar:/etc/storm-0.9.0.1/lib/minlog-1.2.jar:/etc/storm-0.9.0.1/lib/clojure-complete-0.2.3.jar:/etc/storm-0.9.0.1/lib/clojure-1.4.0.jar:/etc/storm-0.9.0.1/lib/asm-4.0.jar:/etc/storm-0.9.0.1/lib/mockito-all-1.9.5.jar:/etc/storm-0.9.0.1/lib/commons-fileupload-1.2.1.jar:/etc/storm-0.9.0.1/lib/clout-1.0.1.jar:/etc/storm-0.9.0.1/lib/ring-servlet-0.3.11.jar:/etc/storm-0.9.0.1/lib/ring-devel-0.3.11.jar:/etc/storm-0.9.0.1/lib/jgrapht-0.8.3.jar:/etc/storm-0.9.0.1/lib/snakeyaml-1.11.jar:/etc/storm-0.9.0.1/lib/reflectasm-1.07-shaded.jar:/etc/storm-0.9.0.1/lib/kryo-2.17.jar:/etc/storm-0.9.0.1/lib/ring-jetty-adapter-0.3.11.jar:/etc/storm-0.9.0.1/lib/compojure-1.1.3.jar:/etc/storm-0.9.0.1/lib/objenesis-1.2.jar:/etc/storm-0.9.0.1/lib/commons-logging-1.1.1.jar:/etc/storm-0.9.0.1/lib/tools.macro-0.1.0.jar:/etc/storm-0.9.0.1/lib/junit-3.8.1.jar:/etc/storm-0.9.0.1/lib/json-simple-1.1.jar:/etc/storm-0.9.0.1/lib/tools.cli-0.2.2.jar:/etc/storm-0.9.0.1/lib/curator-client-1.0.1.jar:/etc/storm-0.9.0.1/lib/jline-0.9.94.jar:/etc/storm-0.9.0.1/lib/zookeeper-3.3.3.jar:/etc/storm-0.9.0.1/lib/guava-13.0.jar:/etc/storm-0.9.0.1/lib/commons-lang-2.5.jar:/etc/storm-0.9.0.1/lib/carbonite-1.5.0.jar:/etc/storm-0.9.0.1/lib/ring-core-1.1.5.jar:/etc/storm-0.9.0.1/lib/jzmq-2.1.0.jar:/etc/storm-0.9.0.1/lib/hiccup-0.3.6.jar:/etc/storm-0.9.0.1/lib/tools.logging-0.2.3.jar:/etc/storm-0.9.0.1/lib/kafka_2.9.2-0.8.0.jar:/etc/storm-0.9.0.1/lib/clj-stacktrace-0.2.2.jar:/etc/storm-0.9.0.1/lib/math.numeric-tower-0.0.1.jar:/etc/storm-0.9.0.1/lib/slf4j-api-1.6.5.jar:/etc/storm-0.9.0.1/lib/netty-3.6.3.Final.jar:/etc/storm-0.9.0.1/lib/disruptor-2.10.1.jar:/etc/storm-0.9.0.1/lib/jetty-util-6.1.26.jar:/etc/storm-0.9.0.1/lib/httpcore-4.1.jar:/etc/storm-0.9.0.1/lib/logback-core-1.0.6.jar:/etc/storm-0.9.0.1/lib/clj-time-0.4.1.jar:target/kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:/etc/storm-0.9.0.1/conf:/etc/storm-0.9.0.1/bin >> >> -Dstorm.jar=target/kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar >> storm.artemis.KafkaConsumerTopology KafkaConsumerTopology >> SLF4J: Class path contains multiple SLF4J bindings. >> SLF4J: Found binding in >> [jar:file:/etc/storm-0.9.0.1/lib/logback-classic-1.0.6.jar!/org/slf4j/impl/StaticLoggerBinder.class] >> SLF4J: Found binding in >> [jar:file:/home/stuser/kafkaprj/kafka-storm-bitmap/target/kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class] >> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an >> explanation. >> 1113 [main] INFO backtype.storm.zookeeper - Starting inprocess zookeeper at >> port 2000 and dir /tmp/dd37d0cc-79b3-4f23-b6a5-3bcf5a9f0879 >> 1216 [main] INFO backtype.storm.daemon.nimbus - Starting Nimbus with conf >> {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", >> "topology.tick.tuple.freq.secs" nil, >> "topology.builtin.metrics.bucket.size.secs" 60, >> "topology.fall.back.on.java.serialization" true, >> "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, >> "topology.skip.missing.kryo.registrations" true, >> "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", >> "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, >> "topology.trident.batch.emit.interval.millis" 50, "nimbus.monitor.freq.secs" >> 10, "logviewer.childopts" "-Xmx128m", "java.library.path" >> "/usr/lib/jvm/java-7-openjdk-amd64", "topology.executor.send.buffer.size" >> 1024, "storm.local.dir" "/tmp/cf44f174-2cda-4e67-8c85-e9f96897fcd9", >> "storm.messaging.netty.buffer_size" 5242880, >> "supervisor.worker.start.timeout.secs" 120, >> "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" >> 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, >> "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "10.100.70.128", >> "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2000, >> "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" >> 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" >> "/storm", "storm.zookeeper.retry.intervalceiling.millis" 30000, >> "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, >> "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" >> "/transactional", "topology.acker.executors" nil, >> "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, >> "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", >> "supervisor.heartbeat.frequency.secs" 5, >> "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, >> "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", >> "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, >> "topology.tasks" nil, "storm.messaging.netty.max_retries" 30, >> "topology.spout.wait.strategy" >> "backtype.storm.spout.SleepSpoutWaitStrategy", "topology.max.spout.pending" >> nil, "storm.zookeeper.retry.interval" 1000, >> "topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" >> "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" >> [6700 6701 6702 6703], "topology.debug" false, "nimbus.task.launch.secs" >> 120, "nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" >> 30, "task.refresh.poll.secs" 10, "topology.workers" 1, >> "supervisor.childopts" "-Xmx256m", "nimbus.thrift.port" 6627, >> "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, >> "topology.tuple.serializer" >> "backtype.storm.serialization.types.ListDelegateSerializer", >> "topology.disruptor.wait.strategy" >> "com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs" 30, >> "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" >> "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" >> 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times" >> 5, "storm.thrift.transport" >> "backtype.storm.security.auth.SimpleTransportPlugin", >> "topology.state.synchronization.timeout.secs" 60, >> "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, >> "storm.messaging.transport" "backtype.storm.messaging.zmq", >> "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, >> "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" >> 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", >> "topology.optimize" true, "topology.max.task.parallelism" nil} >> 1219 [main] INFO backtype.storm.daemon.nimbus - Using default scheduler >> 1237 [main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl - >> Starting >> 1303 [main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state >> update: :connected:none >> 1350 [main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl - >> Starting >> 1417 [main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl - >> Starting >> 1432 [main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state >> update: :connected:none >> 1482 [main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl - >> Starting >> 1484 [main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl - >> Starting >> 1532 [main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state >> update: :connected:none >> 1540 [main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl - >> Starting >> 1568 [main] INFO backtype.storm.daemon.supervisor - Starting Supervisor >> with conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", >> "topology.tick.tuple.freq.secs" nil, >> "topology.builtin.metrics.bucket.size.secs" 60, >> "topology.fall.back.on.java.serialization" true, >> "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, >> "topology.skip.missing.kryo.registrations" true, >> "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", >> "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, >> "topology.trident.batch.emit.interval.millis" 50, "nimbus.monitor.freq.secs" >> 10, "logviewer.childopts" "-Xmx128m", "java.library.path" >> "/usr/lib/jvm/java-7-openjdk-amd64", "topology.executor.send.buffer.size" >> 1024, "storm.local.dir" "/tmp/3e515769-ebf5-4085-a6bf-35f4ad8be388", >> "storm.messaging.netty.buffer_size" 5242880, >> "supervisor.worker.start.timeout.secs" 120, >> "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" >> 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, >> "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "10.100.70.128", >> "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2000, >> "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" >> 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" >> "/storm", "storm.zookeeper.retry.intervalceiling.millis" 30000, >> "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, >> "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" >> "/transactional", "topology.acker.executors" nil, >> "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, >> "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", >> "supervisor.heartbeat.frequency.secs" 5, >> "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, >> "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", >> "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, >> "topology.tasks" nil, "storm.messaging.netty.max_retries" 30, >> "topology.spout.wait.strategy" >> "backtype.storm.spout.SleepSpoutWaitStrategy", "topology.max.spout.pending" >> nil, "storm.zookeeper.retry.interval" 1000, >> "topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" >> "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" >> (1 2 3), "topology.debug" false, "nimbus.task.launch.secs" 120, >> "nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 30, >> "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" >> "-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, >> "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" >> "backtype.storm.serialization.types.ListDelegateSerializer", >> "topology.disruptor.wait.strategy" >> "com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs" 30, >> "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" >> "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" >> 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times" >> 5, "storm.thrift.transport" >> "backtype.storm.security.auth.SimpleTransportPlugin", >> "topology.state.synchronization.timeout.secs" 60, >> "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, >> "storm.messaging.transport" "backtype.storm.messaging.zmq", >> "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, >> "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" >> 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", >> "topology.optimize" true, "topology.max.task.parallelism" nil} >> 1576 [main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl - >> Starting >> 1582 [main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state >> update: :connected:none >> 1590 [main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl - >> Starting >> 1632 [main] INFO backtype.storm.daemon.supervisor - Starting supervisor >> with id 944e6152-ca58-4d2b-8325-94ac98f43995 at host DO-mq-dev >> 1636 [main] INFO backtype.storm.daemon.supervisor - Starting Supervisor >> with conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", >> "topology.tick.tuple.freq.secs" nil, >> "topology.builtin.metrics.bucket.size.secs" 60, >> "topology.fall.back.on.java.serialization" true, >> "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, >> "topology.skip.missing.kryo.registrations" true, >> "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", >> "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, >> "topology.trident.batch.emit.interval.millis" 50, "nimbus.monitor.freq.secs" >> 10, "logviewer.childopts" "-Xmx128m", "java.library.path" >> "/usr/lib/jvm/java-7-openjdk-amd64", "topology.executor.send.buffer.size" >> 1024, "storm.local.dir" "/tmp/d0aeb5f4-0830-4efd-be7f-bc40d5b66912", >> "storm.messaging.netty.buffer_size" 5242880, >> "supervisor.worker.start.timeout.secs" 120, >> "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" >> 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, >> "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "10.100.70.128", >> "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2000, >> "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" >> 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" >> "/storm", "storm.zookeeper.retry.intervalceiling.millis" 30000, >> "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, >> "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" >> "/transactional", "topology.acker.executors" nil, >> "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, >> "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", >> "supervisor.heartbeat.frequency.secs" 5, >> "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, >> "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", >> "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, >> "topology.tasks" nil, "storm.messaging.netty.max_retries" 30, >> "topology.spout.wait.strategy" >> "backtype.storm.spout.SleepSpoutWaitStrategy", "topology.max.spout.pending" >> nil, "storm.zookeeper.retry.interval" 1000, >> "topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" >> "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" >> (4 5 6), "topology.debug" false, "nimbus.task.launch.secs" 120, >> "nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 30, >> "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" >> "-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, >> "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" >> "backtype.storm.serialization.types.ListDelegateSerializer", >> "topology.disruptor.wait.strategy" >> "com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs" 30, >> "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" >> "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" >> 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times" >> 5, "storm.thrift.transport" >> "backtype.storm.security.auth.SimpleTransportPlugin", >> "topology.state.synchronization.timeout.secs" 60, >> "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, >> "storm.messaging.transport" "backtype.storm.messaging.zmq", >> "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, >> "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" >> 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", >> "topology.optimize" true, "topology.max.task.parallelism" nil} >> 1638 [main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl - >> Starting >> 1648 [main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state >> update: :connected:none >> 1690 [main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl - >> Starting >> 1740 [main] INFO backtype.storm.daemon.supervisor - Starting supervisor >> with id e8303ca7-9cc4-4551-8387-7559fc3c53fc at host DO-mq-dev >> 1944 [main] INFO backtype.storm.daemon.nimbus - Received topology >> submission for kafka with conf {"topology.max.task.parallelism" nil, >> "topology.acker.executors" nil, "topology.kryo.register" >> {"storm.trident.topology.TransactionAttempt" nil}, >> "topology.kryo.decorators" (), "topology.name" "kafka", "storm.id" >> "kafka-1-1407257070", "topology.debug" true} >> 1962 [main] INFO backtype.storm.daemon.nimbus - Activating kafka: >> kafka-1-1407257070 >> 2067 [main] INFO backtype.storm.scheduler.EvenScheduler - Available slots: >> (["944e6152-ca58-4d2b-8325-94ac98f43995" 1] >> ["944e6152-ca58-4d2b-8325-94ac98f43995" 2] >> ["944e6152-ca58-4d2b-8325-94ac98f43995" 3] >> ["e8303ca7-9cc4-4551-8387-7559fc3c53fc" 4] >> ["e8303ca7-9cc4-4551-8387-7559fc3c53fc" 5] >> ["e8303ca7-9cc4-4551-8387-7559fc3c53fc" 6]) >> 2088 [main] INFO backtype.storm.daemon.nimbus - Setting new assignment for >> topology id kafka-1-1407257070: >> #backtype.storm.daemon.common.Assignment{:master-code-dir >> "/tmp/cf44f174-2cda-4e67-8c85-e9f96897fcd9/nimbus/stormdist/kafka-1-1407257070", >> :node->host {"944e6152-ca58-4d2b-8325-94ac98f43995" "DO-mq-dev"}, >> :executor->node+port {[3 3] ["944e6152-ca58-4d2b-8325-94ac98f43995" 1], [5 >> 5] ["944e6152-ca58-4d2b-8325-94ac98f43995" 1], [4 4] >> ["944e6152-ca58-4d2b-8325-94ac98f43995" 1], [2 2] >> ["944e6152-ca58-4d2b-8325-94ac98f43995" 1], [1 1] >> ["944e6152-ca58-4d2b-8325-94ac98f43995" 1]}, :executor->start-time-secs {[1 >> 1] 1407257070, [2 2] 1407257070, [4 4] 1407257070, [5 5] 1407257070, [3 3] >> 1407257070}} >> 2215 [main] INFO backtype.storm.daemon.nimbus - Shutting down master >> 2223 [main] INFO backtype.storm.daemon.nimbus - Shut down master >> 2239 [main] INFO backtype.storm.daemon.supervisor - Shutting down >> supervisor 944e6152-ca58-4d2b-8325-94ac98f43995 >> 2240 [Thread-6] INFO backtype.storm.event - Event manager interrupted >> 2241 [Thread-7] INFO backtype.storm.event - Event manager interrupted >> 2248 [main] INFO backtype.storm.daemon.supervisor - Shutting down >> supervisor e8303ca7-9cc4-4551-8387-7559fc3c53fc >> 2248 [Thread-9] INFO backtype.storm.event - Event manager interrupted >> 2248 [Thread-10] INFO backtype.storm.event - Event manager interrupted >> 2256 [main] INFO backtype.storm.testing - Shutting down in process zookeeper >> 2257 [main] INFO backtype.storm.testing - Done shutting down in process >> zookeeper >> 2258 [main] INFO backtype.storm.testing - Deleting temporary path >> /tmp/cf44f174-2cda-4e67-8c85-e9f96897fcd9 >> 2259 [main] INFO backtype.storm.testing - Deleting temporary path >> /tmp/dd37d0cc-79b3-4f23-b6a5-3bcf5a9f0879 >> 2260 [main] INFO backtype.storm.testing - Deleting temporary path >> /tmp/3e515769-ebf5-4085-a6bf-35f4ad8be388 >> 2261 [main] INFO backtype.storm.testing - Deleting temporary path >> /tmp/d0aeb5f4-0830-4efd-be7f-bc40d5b66912 >> >> Anyone can help me locate what the problem is? I really need to walk through >> this step in order to be able to replace .each(printStream()) with other >> functions. >> >> >> Thanks >> >> Alec >> >> On Aug 4, 2014, at 4:24 AM, Marcelo Valle <mva...@redoop.org> wrote: >> >>> hello, >>> >>> you can check your .jar application with command " jar tf " to see if class >>> kafka/api/OffsetRequest.class is part of the jar. >>> If not you can try to copy kafka-2.9.2-0.8.0.jar (or version you are using) >>> in storm_lib directory >>> >>> Marcelo >>> >>> >>> 2014-07-31 23:33 GMT+02:00 Sa Li <sa.in.v...@gmail.com>: >>> Hi, all >>> >>> I am running a kafka-spout code in storm-server, the pom is >>> >>> <groupId>org.apache.kafka</groupId> >>> <artifactId>kafka_2.9.2</artifactId> >>> <version>0.8.0</version> >>> <scope>provided</scope> >>> >>> <exclusions> >>> <exclusion> >>> <groupId>org.apache.zookeeper</groupId> >>> <artifactId>zookeeper</artifactId> >>> </exclusion> >>> <exclusion> >>> <groupId>log4j</groupId> >>> <artifactId>log4j</artifactId> >>> </exclusion> >>> </exclusions> >>> >>> </dependency> >>> >>> <!-- Storm-Kafka compiled --> >>> >>> <dependency> >>> <artifactId>storm-kafka</artifactId> >>> <groupId>org.apache.storm</groupId> >>> <version>0.9.2-incubating</version> >>> <scope>*compile*</scope> >>> </dependency> >>> >>> I can mvn package it, but when I run it >>> root@DO-mq-dev:/home/stuser/kafkaprj/kafka-storm-bitmap# storm jar >>> target/kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar >>> storm.artemis.KafkaConsumerTopology KafkaConsumerTopology >>> >>> >>> I am getting such error >>> >>> 1657 [main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl - >>> Starting >>> 1682 [main] INFO backtype.storm.daemon.supervisor - Starting supervisor >>> with id a66e0c61-a951-4c1b-a43f-3fb0d12cb226 at host DO-mq-dev >>> 1698 [main] ERROR org.apache.zookeeper.server.NIOServerCnxn - Thread >>> Thread[main,5,main] died >>> java.lang.NoClassDefFoundError: kafka/api/OffsetRequest >>> at storm.artemis.kafka.KafkaConfig.<init>(KafkaConfig.java:26) >>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >>> at >>> storm.artemis.kafka.trident.TridentKafkaConfig.<init>(TridentKafkaConfig.java:13) >>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >>> at >>> storm.artemis.KafkaConsumerTopology.buildTopology(KafkaConsumerTopology.java:115) >>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >>> at >>> storm.artemis.KafkaConsumerTopology.main(KafkaConsumerTopology.java:144) >>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >>> Caused by: java.lang.ClassNotFoundException: kafka.api.OffsetRequest >>> at java.net.URLClassLoader$1.run(URLClassLoader.java:366) ~[na:1.7.0_55] >>> at java.net.URLClassLoader$1.run(URLClassLoader.java:355) ~[na:1.7.0_55] >>> at java.security.AccessController.doPrivileged(Native Method) >>> ~[na:1.7.0_55] >>> at java.net.URLClassLoader.findClass(URLClassLoader.java:354) >>> ~[na:1.7.0_55] >>> at java.lang.ClassLoader.loadClass(ClassLoader.java:425) ~[na:1.7.0_55] >>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) >>> ~[na:1.7.0_55] >>> at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ~[na:1.7.0_55] >>> >>> >>> >>> >>> I try to poke around online, could not find a solution for it, any idea >>> about that? >>> >>> >>> Thanks >>> >>> Alec >>> >>> >>> >>> >> >
signature.asc
Description: Message signed with OpenPGP using GPGMail