You are only sleeping for 100 milliseconds before shutting down the local 
cluster, which is probably not long enough for the topology to come up and 
start processing messages. Try increasing the sleep time to something like 10 
seconds.

You can also reduce startup time with the following JVM flag:

-Djava.net.preferIPv4Stack=true

- Taylor

On Aug 5, 2014, at 1:16 PM, Sa Li <sa.in.v...@gmail.com> wrote:

> Sorry, the stormTopology:
> 
>>                         TridentTopology topology = new TridentTopology();
>>                      BrokerHosts zk = new ZkHosts("localhost");
>>                      TridentKafkaConfig spoutConf = new 
>> TridentKafkaConfig(zk, “topictest");
>>                      spoutConf.scheme = new SchemeAsMultiScheme(new 
>> StringScheme());
>>                      OpaqueTridentKafkaSpout spout = new 
>> OpaqueTridentKafkaSpout(spoutConf);
> 
> 
> 
> 
> On Aug 5, 2014, at 9:56 AM, Sa Li <sa.in.v...@gmail.com> wrote:
> 
>> Thank you very much, Marcelo, it indeed worked, now I can run my code 
>> without getting error. However, another thing is keeping bother me, 
>> following is my code:
>> 
>> public static class PrintStream implements Filter {
>> 
>>                      @SuppressWarnings("rawtypes”)
>>                      @Override
>>                      public void prepare(Map conf, TridentOperationContext 
>> context) {
>>                      }
>>                      @Override
>>                      public void cleanup() {
>>                      }
>>                      @Override
>>                      public boolean isKeep(TridentTuple tuple) {
>>                              System.out.println(tuple);
>>                              return true;
>>                      }
>> }
>> public static StormTopology buildTopology(LocalDRPC drpc) throws IOException 
>> {
>>                 
>>                      TridentTopology topology = new TridentTopology();
>>                      BrokerHosts zk = new ZkHosts("localhost");
>>                      TridentKafkaConfig spoutConf = new 
>> TridentKafkaConfig(zk, "ingest_test");
>>                      spoutConf.scheme = new SchemeAsMultiScheme(new 
>> StringScheme());
>>                      OpaqueTridentKafkaSpout spout = new 
>> OpaqueTridentKafkaSpout(spoutConf);
>>                                
>>                      topology.newStream("kafka", spout)  
>>                                          .each(new Fields("str"),
>>                                             new PrintStream()                
>>      
>>                      );
>>                     
>>                     return topology.build();
>> }
>> public static void main(String[] args) throws Exception {
>>               
>>                  Config conf = new Config();
>>                  conf.setDebug(true);
>>                     conf.setMaxSpoutPending(1);
>>                  conf.setMaxTaskParallelism(3);
>>                  LocalDRPC drpc = new LocalDRPC();
>>                  LocalCluster cluster = new LocalCluster();
>>                  cluster.submitTopology("kafka", conf, buildTopology(drpc)); 
>>     
>>                  Thread.sleep(100);
>>                  cluster.shutdown();   
>> } 
>> 
>> What I expect is quite simple, print out the message I collect from a kafka 
>> producer playback process which is running separately. The topic is listed 
>> as:
>> 
>> root@DO-mq-dev:/etc/kafka# bin/kafka-list-topic.sh --zookeeper localhost:2181
>> topic: topictest        partition: 0    leader: 1       replicas: 1,3,2 isr: 
>> 1,3,2
>> topic: topictest        partition: 1    leader: 2       replicas: 2,1,3 isr: 
>> 2,1,3
>> topic: topictest        partition: 2    leader: 3       replicas: 3,2,1 isr: 
>> 3,2,1
>> topic: topictest        partition: 3    leader: 1       replicas: 1,2,3 isr: 
>> 1,2,3
>> topic: topictest        partition: 4    leader: 2       replicas: 2,3,1 isr: 
>> 2,3,1
>> 
>> When I am running the code, this is what I saw on the screen, seems no 
>> error, but no message print out as well:
>> 
>> SLF4J: Class path contains multiple SLF4J bindings.
>> SLF4J: Found binding in 
>> [jar:file:/etc/storm-0.9.0.1/lib/logback-classic-1.0.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: Found binding in 
>> [jar:file:/home/stuser/kafkaprj/kafka-storm-bitmap/target/kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
>> explanation.
>> Running: java -client -Dstorm.options= -Dstorm.home=/etc/storm-0.9.0.1 
>> -Djava.library.path=/usr/lib/jvm/java-7-openjdk-amd64 -Dstorm.conf.file= -cp 
>> /etc/storm-0.9.0.1/storm-netty-0.9.0.1.jar:/etc/storm-0.9.0.1/storm-core-0.9.0.1.jar:/etc/storm-0.9.0.1/storm-console-logging-0.9.0.1.jar:/etc/storm-0.9.0.1/lib/log4j-over-slf4j-1.6.6.jar:/etc/storm-0.9.0.1/lib/commons-io-1.4.jar:/etc/storm-0.9.0.1/lib/joda-time-2.0.jar:/etc/storm-0.9.0.1/lib/tools.nrepl-0.2.3.jar:/etc/storm-0.9.0.1/lib/servlet-api-2.5.jar:/etc/storm-0.9.0.1/lib/curator-framework-1.0.1.jar:/etc/storm-0.9.0.1/lib/core.incubator-0.1.0.jar:/etc/storm-0.9.0.1/lib/jetty-6.1.26.jar:/etc/storm-0.9.0.1/lib/commons-codec-1.4.jar:/etc/storm-0.9.0.1/lib/servlet-api-2.5-20081211.jar:/etc/storm-0.9.0.1/lib/httpclient-4.1.1.jar:/etc/storm-0.9.0.1/lib/commons-exec-1.1.jar:/etc/storm-0.9.0.1/lib/logback-classic-1.0.6.jar:/etc/storm-0.9.0.1/lib/libthrift7-0.7.0-2.jar:/etc/storm-0.9.0.1/lib/minlog-1.2.jar:/etc/storm-0.9.0.1/lib/clojure-complete-0.2.3.jar:/etc/storm-0.9.0.1/lib/clojure-1.4.0.jar:/etc/storm-0.9.0.1/lib/asm-4.0.jar:/etc/storm-0.9.0.1/lib/mockito-all-1.9.5.jar:/etc/storm-0.9.0.1/lib/commons-fileupload-1.2.1.jar:/etc/storm-0.9.0.1/lib/clout-1.0.1.jar:/etc/storm-0.9.0.1/lib/ring-servlet-0.3.11.jar:/etc/storm-0.9.0.1/lib/ring-devel-0.3.11.jar:/etc/storm-0.9.0.1/lib/jgrapht-0.8.3.jar:/etc/storm-0.9.0.1/lib/snakeyaml-1.11.jar:/etc/storm-0.9.0.1/lib/reflectasm-1.07-shaded.jar:/etc/storm-0.9.0.1/lib/kryo-2.17.jar:/etc/storm-0.9.0.1/lib/ring-jetty-adapter-0.3.11.jar:/etc/storm-0.9.0.1/lib/compojure-1.1.3.jar:/etc/storm-0.9.0.1/lib/objenesis-1.2.jar:/etc/storm-0.9.0.1/lib/commons-logging-1.1.1.jar:/etc/storm-0.9.0.1/lib/tools.macro-0.1.0.jar:/etc/storm-0.9.0.1/lib/junit-3.8.1.jar:/etc/storm-0.9.0.1/lib/json-simple-1.1.jar:/etc/storm-0.9.0.1/lib/tools.cli-0.2.2.jar:/etc/storm-0.9.0.1/lib/curator-client-1.0.1.jar:/etc/storm-0.9.0.1/lib/jline-0.9.94.jar:/etc/storm-0.9.0.1/lib/zookeeper-3.3.3.jar:/etc/storm-0.9.0.1/lib/guava-13.0.jar:/etc/storm-0.9.0.1/lib/commons-lang-2.5.jar:/etc/storm-0.9.0.1/lib/carbonite-1.5.0.jar:/etc/storm-0.9.0.1/lib/ring-core-1.1.5.jar:/etc/storm-0.9.0.1/lib/jzmq-2.1.0.jar:/etc/storm-0.9.0.1/lib/hiccup-0.3.6.jar:/etc/storm-0.9.0.1/lib/tools.logging-0.2.3.jar:/etc/storm-0.9.0.1/lib/kafka_2.9.2-0.8.0.jar:/etc/storm-0.9.0.1/lib/clj-stacktrace-0.2.2.jar:/etc/storm-0.9.0.1/lib/math.numeric-tower-0.0.1.jar:/etc/storm-0.9.0.1/lib/slf4j-api-1.6.5.jar:/etc/storm-0.9.0.1/lib/netty-3.6.3.Final.jar:/etc/storm-0.9.0.1/lib/disruptor-2.10.1.jar:/etc/storm-0.9.0.1/lib/jetty-util-6.1.26.jar:/etc/storm-0.9.0.1/lib/httpcore-4.1.jar:/etc/storm-0.9.0.1/lib/logback-core-1.0.6.jar:/etc/storm-0.9.0.1/lib/clj-time-0.4.1.jar:target/kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:/etc/storm-0.9.0.1/conf:/etc/storm-0.9.0.1/bin
>>  
>> -Dstorm.jar=target/kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar
>>  storm.artemis.KafkaConsumerTopology KafkaConsumerTopology
>> SLF4J: Class path contains multiple SLF4J bindings.
>> SLF4J: Found binding in 
>> [jar:file:/etc/storm-0.9.0.1/lib/logback-classic-1.0.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: Found binding in 
>> [jar:file:/home/stuser/kafkaprj/kafka-storm-bitmap/target/kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
>> explanation.
>> 1113 [main] INFO  backtype.storm.zookeeper - Starting inprocess zookeeper at 
>> port 2000 and dir /tmp/dd37d0cc-79b3-4f23-b6a5-3bcf5a9f0879
>> 1216 [main] INFO  backtype.storm.daemon.nimbus - Starting Nimbus with conf 
>> {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", 
>> "topology.tick.tuple.freq.secs" nil, 
>> "topology.builtin.metrics.bucket.size.secs" 60, 
>> "topology.fall.back.on.java.serialization" true, 
>> "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, 
>> "topology.skip.missing.kryo.registrations" true, 
>> "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", 
>> "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, 
>> "topology.trident.batch.emit.interval.millis" 50, "nimbus.monitor.freq.secs" 
>> 10, "logviewer.childopts" "-Xmx128m", "java.library.path" 
>> "/usr/lib/jvm/java-7-openjdk-amd64", "topology.executor.send.buffer.size" 
>> 1024, "storm.local.dir" "/tmp/cf44f174-2cda-4e67-8c85-e9f96897fcd9", 
>> "storm.messaging.netty.buffer_size" 5242880, 
>> "supervisor.worker.start.timeout.secs" 120, 
>> "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 
>> 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, 
>> "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "10.100.70.128", 
>> "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2000, 
>> "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 
>> 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" 
>> "/storm", "storm.zookeeper.retry.intervalceiling.millis" 30000, 
>> "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, 
>> "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" 
>> "/transactional", "topology.acker.executors" nil, 
>> "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, 
>> "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", 
>> "supervisor.heartbeat.frequency.secs" 5, 
>> "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, 
>> "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", 
>> "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, 
>> "topology.tasks" nil, "storm.messaging.netty.max_retries" 30, 
>> "topology.spout.wait.strategy" 
>> "backtype.storm.spout.SleepSpoutWaitStrategy", "topology.max.spout.pending" 
>> nil, "storm.zookeeper.retry.interval" 1000, 
>> "topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" 
>> "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" 
>> [6700 6701 6702 6703], "topology.debug" false, "nimbus.task.launch.secs" 
>> 120, "nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 
>> 30, "task.refresh.poll.secs" 10, "topology.workers" 1, 
>> "supervisor.childopts" "-Xmx256m", "nimbus.thrift.port" 6627, 
>> "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, 
>> "topology.tuple.serializer" 
>> "backtype.storm.serialization.types.ListDelegateSerializer", 
>> "topology.disruptor.wait.strategy" 
>> "com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs" 30, 
>> "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" 
>> "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 
>> 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times" 
>> 5, "storm.thrift.transport" 
>> "backtype.storm.security.auth.SimpleTransportPlugin", 
>> "topology.state.synchronization.timeout.secs" 60, 
>> "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, 
>> "storm.messaging.transport" "backtype.storm.messaging.zmq", 
>> "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, 
>> "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 
>> 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", 
>> "topology.optimize" true, "topology.max.task.parallelism" nil}
>> 1219 [main] INFO  backtype.storm.daemon.nimbus - Using default scheduler
>> 1237 [main] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl - 
>> Starting
>> 1303 [main-EventThread] INFO  backtype.storm.zookeeper - Zookeeper state 
>> update: :connected:none
>> 1350 [main] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl - 
>> Starting
>> 1417 [main] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl - 
>> Starting
>> 1432 [main-EventThread] INFO  backtype.storm.zookeeper - Zookeeper state 
>> update: :connected:none
>> 1482 [main] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl - 
>> Starting
>> 1484 [main] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl - 
>> Starting
>> 1532 [main-EventThread] INFO  backtype.storm.zookeeper - Zookeeper state 
>> update: :connected:none
>> 1540 [main] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl - 
>> Starting
>> 1568 [main] INFO  backtype.storm.daemon.supervisor - Starting Supervisor 
>> with conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", 
>> "topology.tick.tuple.freq.secs" nil, 
>> "topology.builtin.metrics.bucket.size.secs" 60, 
>> "topology.fall.back.on.java.serialization" true, 
>> "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, 
>> "topology.skip.missing.kryo.registrations" true, 
>> "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", 
>> "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, 
>> "topology.trident.batch.emit.interval.millis" 50, "nimbus.monitor.freq.secs" 
>> 10, "logviewer.childopts" "-Xmx128m", "java.library.path" 
>> "/usr/lib/jvm/java-7-openjdk-amd64", "topology.executor.send.buffer.size" 
>> 1024, "storm.local.dir" "/tmp/3e515769-ebf5-4085-a6bf-35f4ad8be388", 
>> "storm.messaging.netty.buffer_size" 5242880, 
>> "supervisor.worker.start.timeout.secs" 120, 
>> "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 
>> 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, 
>> "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "10.100.70.128", 
>> "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2000, 
>> "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 
>> 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" 
>> "/storm", "storm.zookeeper.retry.intervalceiling.millis" 30000, 
>> "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, 
>> "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" 
>> "/transactional", "topology.acker.executors" nil, 
>> "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, 
>> "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", 
>> "supervisor.heartbeat.frequency.secs" 5, 
>> "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, 
>> "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", 
>> "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, 
>> "topology.tasks" nil, "storm.messaging.netty.max_retries" 30, 
>> "topology.spout.wait.strategy" 
>> "backtype.storm.spout.SleepSpoutWaitStrategy", "topology.max.spout.pending" 
>> nil, "storm.zookeeper.retry.interval" 1000, 
>> "topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" 
>> "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" 
>> (1 2 3), "topology.debug" false, "nimbus.task.launch.secs" 120, 
>> "nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 30, 
>> "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" 
>> "-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, 
>> "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" 
>> "backtype.storm.serialization.types.ListDelegateSerializer", 
>> "topology.disruptor.wait.strategy" 
>> "com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs" 30, 
>> "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" 
>> "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 
>> 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times" 
>> 5, "storm.thrift.transport" 
>> "backtype.storm.security.auth.SimpleTransportPlugin", 
>> "topology.state.synchronization.timeout.secs" 60, 
>> "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, 
>> "storm.messaging.transport" "backtype.storm.messaging.zmq", 
>> "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, 
>> "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 
>> 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", 
>> "topology.optimize" true, "topology.max.task.parallelism" nil}
>> 1576 [main] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl - 
>> Starting
>> 1582 [main-EventThread] INFO  backtype.storm.zookeeper - Zookeeper state 
>> update: :connected:none
>> 1590 [main] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl - 
>> Starting
>> 1632 [main] INFO  backtype.storm.daemon.supervisor - Starting supervisor 
>> with id 944e6152-ca58-4d2b-8325-94ac98f43995 at host DO-mq-dev
>> 1636 [main] INFO  backtype.storm.daemon.supervisor - Starting Supervisor 
>> with conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", 
>> "topology.tick.tuple.freq.secs" nil, 
>> "topology.builtin.metrics.bucket.size.secs" 60, 
>> "topology.fall.back.on.java.serialization" true, 
>> "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, 
>> "topology.skip.missing.kryo.registrations" true, 
>> "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", 
>> "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, 
>> "topology.trident.batch.emit.interval.millis" 50, "nimbus.monitor.freq.secs" 
>> 10, "logviewer.childopts" "-Xmx128m", "java.library.path" 
>> "/usr/lib/jvm/java-7-openjdk-amd64", "topology.executor.send.buffer.size" 
>> 1024, "storm.local.dir" "/tmp/d0aeb5f4-0830-4efd-be7f-bc40d5b66912", 
>> "storm.messaging.netty.buffer_size" 5242880, 
>> "supervisor.worker.start.timeout.secs" 120, 
>> "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 
>> 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, 
>> "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "10.100.70.128", 
>> "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2000, 
>> "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 
>> 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" 
>> "/storm", "storm.zookeeper.retry.intervalceiling.millis" 30000, 
>> "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, 
>> "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" 
>> "/transactional", "topology.acker.executors" nil, 
>> "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, 
>> "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", 
>> "supervisor.heartbeat.frequency.secs" 5, 
>> "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, 
>> "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", 
>> "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, 
>> "topology.tasks" nil, "storm.messaging.netty.max_retries" 30, 
>> "topology.spout.wait.strategy" 
>> "backtype.storm.spout.SleepSpoutWaitStrategy", "topology.max.spout.pending" 
>> nil, "storm.zookeeper.retry.interval" 1000, 
>> "topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" 
>> "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" 
>> (4 5 6), "topology.debug" false, "nimbus.task.launch.secs" 120, 
>> "nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 30, 
>> "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" 
>> "-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, 
>> "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" 
>> "backtype.storm.serialization.types.ListDelegateSerializer", 
>> "topology.disruptor.wait.strategy" 
>> "com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs" 30, 
>> "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" 
>> "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 
>> 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times" 
>> 5, "storm.thrift.transport" 
>> "backtype.storm.security.auth.SimpleTransportPlugin", 
>> "topology.state.synchronization.timeout.secs" 60, 
>> "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, 
>> "storm.messaging.transport" "backtype.storm.messaging.zmq", 
>> "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, 
>> "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 
>> 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", 
>> "topology.optimize" true, "topology.max.task.parallelism" nil}
>> 1638 [main] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl - 
>> Starting
>> 1648 [main-EventThread] INFO  backtype.storm.zookeeper - Zookeeper state 
>> update: :connected:none
>> 1690 [main] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl - 
>> Starting
>> 1740 [main] INFO  backtype.storm.daemon.supervisor - Starting supervisor 
>> with id e8303ca7-9cc4-4551-8387-7559fc3c53fc at host DO-mq-dev
>> 1944 [main] INFO  backtype.storm.daemon.nimbus - Received topology 
>> submission for kafka with conf {"topology.max.task.parallelism" nil, 
>> "topology.acker.executors" nil, "topology.kryo.register" 
>> {"storm.trident.topology.TransactionAttempt" nil}, 
>> "topology.kryo.decorators" (), "topology.name" "kafka", "storm.id" 
>> "kafka-1-1407257070", "topology.debug" true}
>> 1962 [main] INFO  backtype.storm.daemon.nimbus - Activating kafka: 
>> kafka-1-1407257070
>> 2067 [main] INFO  backtype.storm.scheduler.EvenScheduler - Available slots: 
>> (["944e6152-ca58-4d2b-8325-94ac98f43995" 1] 
>> ["944e6152-ca58-4d2b-8325-94ac98f43995" 2] 
>> ["944e6152-ca58-4d2b-8325-94ac98f43995" 3] 
>> ["e8303ca7-9cc4-4551-8387-7559fc3c53fc" 4] 
>> ["e8303ca7-9cc4-4551-8387-7559fc3c53fc" 5] 
>> ["e8303ca7-9cc4-4551-8387-7559fc3c53fc" 6])
>> 2088 [main] INFO  backtype.storm.daemon.nimbus - Setting new assignment for 
>> topology id kafka-1-1407257070: 
>> #backtype.storm.daemon.common.Assignment{:master-code-dir 
>> "/tmp/cf44f174-2cda-4e67-8c85-e9f96897fcd9/nimbus/stormdist/kafka-1-1407257070",
>>  :node->host {"944e6152-ca58-4d2b-8325-94ac98f43995" "DO-mq-dev"}, 
>> :executor->node+port {[3 3] ["944e6152-ca58-4d2b-8325-94ac98f43995" 1], [5 
>> 5] ["944e6152-ca58-4d2b-8325-94ac98f43995" 1], [4 4] 
>> ["944e6152-ca58-4d2b-8325-94ac98f43995" 1], [2 2] 
>> ["944e6152-ca58-4d2b-8325-94ac98f43995" 1], [1 1] 
>> ["944e6152-ca58-4d2b-8325-94ac98f43995" 1]}, :executor->start-time-secs {[1 
>> 1] 1407257070, [2 2] 1407257070, [4 4] 1407257070, [5 5] 1407257070, [3 3] 
>> 1407257070}}
>> 2215 [main] INFO  backtype.storm.daemon.nimbus - Shutting down master
>> 2223 [main] INFO  backtype.storm.daemon.nimbus - Shut down master
>> 2239 [main] INFO  backtype.storm.daemon.supervisor - Shutting down 
>> supervisor 944e6152-ca58-4d2b-8325-94ac98f43995
>> 2240 [Thread-6] INFO  backtype.storm.event - Event manager interrupted
>> 2241 [Thread-7] INFO  backtype.storm.event - Event manager interrupted
>> 2248 [main] INFO  backtype.storm.daemon.supervisor - Shutting down 
>> supervisor e8303ca7-9cc4-4551-8387-7559fc3c53fc
>> 2248 [Thread-9] INFO  backtype.storm.event - Event manager interrupted
>> 2248 [Thread-10] INFO  backtype.storm.event - Event manager interrupted
>> 2256 [main] INFO  backtype.storm.testing - Shutting down in process zookeeper
>> 2257 [main] INFO  backtype.storm.testing - Done shutting down in process 
>> zookeeper
>> 2258 [main] INFO  backtype.storm.testing - Deleting temporary path 
>> /tmp/cf44f174-2cda-4e67-8c85-e9f96897fcd9
>> 2259 [main] INFO  backtype.storm.testing - Deleting temporary path 
>> /tmp/dd37d0cc-79b3-4f23-b6a5-3bcf5a9f0879
>> 2260 [main] INFO  backtype.storm.testing - Deleting temporary path 
>> /tmp/3e515769-ebf5-4085-a6bf-35f4ad8be388
>> 2261 [main] INFO  backtype.storm.testing - Deleting temporary path 
>> /tmp/d0aeb5f4-0830-4efd-be7f-bc40d5b66912
>> 
>> Anyone can help me locate what the problem is? I really need to walk through 
>> this step in order to be able to replace .each(printStream()) with other 
>> functions.
>> 
>> 
>> Thanks
>> 
>> Alec
>>  
>> On Aug 4, 2014, at 4:24 AM, Marcelo Valle <mva...@redoop.org> wrote:
>> 
>>> hello,
>>> 
>>> you can check your .jar application with command " jar tf " to see if class 
>>> kafka/api/OffsetRequest.class is part of the jar.
>>> If not you can try to copy kafka-2.9.2-0.8.0.jar (or version you are using) 
>>> in storm_lib directory
>>> 
>>> Marcelo
>>> 
>>> 
>>> 2014-07-31 23:33 GMT+02:00 Sa Li <sa.in.v...@gmail.com>:
>>> Hi, all
>>> 
>>> I am running a kafka-spout code in storm-server, the pom is 
>>> 
>>>   <groupId>org.apache.kafka</groupId>
>>>         <artifactId>kafka_2.9.2</artifactId>
>>>         <version>0.8.0</version>
>>>                 <scope>provided</scope>
>>> 
>>>                 <exclusions>
>>>                   <exclusion>
>>>                        <groupId>org.apache.zookeeper</groupId>
>>>                        <artifactId>zookeeper</artifactId>
>>>                   </exclusion>
>>>                   <exclusion>
>>>                        <groupId>log4j</groupId>
>>>                        <artifactId>log4j</artifactId>
>>>                   </exclusion>
>>>                 </exclusions>
>>> 
>>>     </dependency>
>>> 
>>>             <!-- Storm-Kafka compiled -->
>>> 
>>>         <dependency>
>>>             <artifactId>storm-kafka</artifactId>
>>>             <groupId>org.apache.storm</groupId>
>>>             <version>0.9.2-incubating</version>
>>>             <scope>*compile*</scope>
>>>         </dependency>
>>> 
>>> I can mvn package it, but when I run it 
>>> root@DO-mq-dev:/home/stuser/kafkaprj/kafka-storm-bitmap# storm jar 
>>> target/kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar 
>>> storm.artemis.KafkaConsumerTopology KafkaConsumerTopology
>>>  
>>> 
>>> I am getting such error
>>> 
>>> 1657 [main] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl - 
>>> Starting
>>> 1682 [main] INFO  backtype.storm.daemon.supervisor - Starting supervisor 
>>> with id a66e0c61-a951-4c1b-a43f-3fb0d12cb226 at host DO-mq-dev
>>> 1698 [main] ERROR org.apache.zookeeper.server.NIOServerCnxn - Thread 
>>> Thread[main,5,main] died
>>> java.lang.NoClassDefFoundError: kafka/api/OffsetRequest
>>>     at storm.artemis.kafka.KafkaConfig.<init>(KafkaConfig.java:26) 
>>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>>     at 
>>> storm.artemis.kafka.trident.TridentKafkaConfig.<init>(TridentKafkaConfig.java:13)
>>>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>>     at 
>>> storm.artemis.KafkaConsumerTopology.buildTopology(KafkaConsumerTopology.java:115)
>>>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>>     at 
>>> storm.artemis.KafkaConsumerTopology.main(KafkaConsumerTopology.java:144) 
>>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>> Caused by: java.lang.ClassNotFoundException: kafka.api.OffsetRequest
>>>     at java.net.URLClassLoader$1.run(URLClassLoader.java:366) ~[na:1.7.0_55]
>>>     at java.net.URLClassLoader$1.run(URLClassLoader.java:355) ~[na:1.7.0_55]
>>>     at java.security.AccessController.doPrivileged(Native Method) 
>>> ~[na:1.7.0_55]
>>>     at java.net.URLClassLoader.findClass(URLClassLoader.java:354) 
>>> ~[na:1.7.0_55]
>>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:425) ~[na:1.7.0_55]
>>>     at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) 
>>> ~[na:1.7.0_55]
>>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ~[na:1.7.0_55]
>>> 
>>> 
>>> 
>>> 
>>> I try to poke around online, could not find a solution for it, any idea 
>>> about that?
>>> 
>>> 
>>> Thanks
>>> 
>>> Alec
>>> 
>>> 
>>> 
>>> 
>> 
> 

Attachment: signature.asc
Description: Message signed with OpenPGP using GPGMail

Reply via email to