Can you let the topology run for 120 seconds or so? In my experience the kafka 
bolt/spout takes a lot of latency initially as it tries to read/write from 
zookeeper and initialize connections. On my mac it takes about 15 seconds 
before the spout is actually opened. 

Thanks
Parth
On Aug 5, 2014, at 1:11 PM, Sa Li <sa.in.v...@gmail.com> wrote:

> If I set the sleep time as 1000 milisec, I got such error:
> 
> 3067 [main] INFO  backtype.storm.testing - Deleting temporary path 
> /tmp/0f1851f1-9499-48a5-817e-41712921d054
> 3163 [Thread-10-EventThread] INFO  
> com.netflix.curator.framework.state.ConnectionStateManager - State change: 
> SUSPENDED
> 3163 [ConnectionStateManager-0] WARN  
> com.netflix.curator.framework.state.ConnectionStateManager - There are no 
> ConnectionStateListeners registered.
> 3164 [Thread-10-EventThread] WARN  backtype.storm.cluster - Received event 
> :disconnected::none: with disconnected Zookeeper.
> 3636 [Thread-10-SendThread(localhost:2000)] WARN  
> org.apache.zookeeper.ClientCnxn - Session 0x147a7c868ef000b for server null, 
> unexpected error, closing socket connection and attempting reconnect
> java.net.ConnectException: Connection refused
>         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) 
> ~[na:1.7.0_55]
>         at 
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) 
> ~[na:1.7.0_55]
>         at 
> org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1119) 
> ~[zookeeper-3.3.3.jar:3.3.3-1073969]
> 4877 [Thread-10-SendThread(localhost:2000)] WARN  
> org.apache.zookeeper.ClientCnxn - Session 0x147a7c868ef000b for server null, 
> unexpected error, closing socket connection and attempting reconnect
> java.net.ConnectException: Connection refused
>         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) 
> ~[na:1.7.0_55]
>         at 
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) 
> ~[na:1.7.0_55]
>         at 
> org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1119) 
> ~[zookeeper-3.3.3.jar:3.3.3-1073969]
> 5566 [Thread-10-SendThread(localhost:2000)] WARN  
> org.apache.zookeeper.ClientCnxn - Session 0x147a7c868ef000b for server null, 
> unexpected error, closing socket connection and attempting reconnect
> java.net.ConnectException: Connection refused
> 
> seems not even connected to zookeeper, any method to confirm to connection of 
> zookeeper?
> 
> Thanks a lot
> 
> Alec
> 
> On Aug 5, 2014, at 12:58 PM, Sa Li <sa.in.v...@gmail.com> wrote:
> 
>> Thank you very much for your reply, Taylor. I tried to increase the sleep 
>> time as 1 sec or 10 sec, however I got such error, it seems to be Async loop 
>> error. Any idea about that?
>> 
>> 3053 [Thread-19-$spoutcoord-spout0] INFO  
>> org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting
>> 3058 [Thread-19-$spoutcoord-spout0] ERROR backtype.storm.util - Async loop 
>> died!
>> java.lang.NoSuchMethodError: 
>> org.apache.zookeeper.ZooKeeper.<init>(Ljava/lang/String;ILorg/apache/zookeeper/Watcher;Z)V
>>         at 
>> org.apache.curator.utils.DefaultZookeeperFactory.newZooKeeper(DefaultZookeeperFactory.java:29)
>>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> org.apache.curator.framework.imps.CuratorFrameworkImpl$2.newZooKeeper(CuratorFrameworkImpl.java:154)
>>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> org.apache.curator.HandleHolder$1.getZooKeeper(HandleHolder.java:94) 
>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> org.apache.curator.HandleHolder.getZooKeeper(HandleHolder.java:55) 
>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> org.apache.curator.ConnectionState.reset(ConnectionState.java:219) 
>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> org.apache.curator.ConnectionState.start(ConnectionState.java:103) 
>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> org.apache.curator.CuratorZookeeperClient.start(CuratorZookeeperClient.java:190)
>>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> org.apache.curator.framework.imps.CuratorFrameworkImpl.start(CuratorFrameworkImpl.java:256)
>>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> storm.artemis.kafka.DynamicBrokersReader.<init>(DynamicBrokersReader.java:36)
>>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> storm.artemis.kafka.trident.ZkBrokerReader.<init>(ZkBrokerReader.java:24) 
>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> storm.artemis.kafka.KafkaUtils.makeBrokerReader(KafkaUtils.java:40) 
>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> storm.artemis.kafka.trident.Coordinator.<init>(Coordinator.java:16) 
>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> storm.artemis.kafka.trident.OpaqueTridentKafkaSpout.getCoordinator(OpaqueTridentKafkaSpout.java:29)
>>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Coordinator.<init>(OpaquePartitionedTridentSpoutExecutor.java:27)
>>  ~[storm-core-0.9.0.1.jar:na]
>>         at 
>> storm.trident.spout.OpaquePartitionedTridentSpoutExecutor.getCoordinator(OpaquePartitionedTridentSpoutExecutor.java:166)
>>  ~[storm-core-0.9.0.1.jar:na]
>>         at 
>> storm.trident.spout.TridentSpoutCoordinator.prepare(TridentSpoutCoordinator.java:38)
>>  ~[storm-core-0.9.0.1.jar:na]
>>         at 
>> backtype.storm.topology.BasicBoltExecutor.prepare(BasicBoltExecutor.java:26) 
>> ~[storm-core-0.9.0.1.jar:na]
>>         at 
>> backtype.storm.daemon.executor$fn__3498$fn__3510.invoke(executor.clj:674) 
>> ~[storm-core-0.9.0.1.jar:na]
>>         at backtype.storm.util$async_loop$fn__444.invoke(util.clj:401) 
>> ~[storm-core-0.9.0.1.jar:na]
>>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
>>         at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]
>> 3058 [Thread-25-spout0] ERROR backtype.storm.util - Async loop died!
>> java.lang.NoSuchMethodError: 
>> org.apache.zookeeper.ZooKeeper.<init>(Ljava/lang/String;ILorg/apache/zookeeper/Watcher;Z)V
>>         at 
>> org.apache.curator.utils.DefaultZookeeperFactory.newZooKeeper(DefaultZookeeperFactory.java:29)
>>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> org.apache.curator.framework.imps.CuratorFrameworkImpl$2.newZooKeeper(CuratorFrameworkImpl.java:154)
>>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> org.apache.curator.HandleHolder$1.getZooKeeper(HandleHolder.java:94) 
>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> org.apache.curator.HandleHolder.getZooKeeper(HandleHolder.java:55) 
>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> org.apache.curator.ConnectionState.reset(ConnectionState.java:219) 
>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> org.apache.curator.ConnectionState.start(ConnectionState.java:103) 
>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> org.apache.curator.CuratorZookeeperClient.start(CuratorZookeeperClient.java:190)
>>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> org.apache.curator.framework.imps.CuratorFrameworkImpl.start(CuratorFrameworkImpl.java:256)
>>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> storm.artemis.kafka.DynamicBrokersReader.<init>(DynamicBrokersReader.java:36)
>>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> storm.artemis.kafka.trident.ZkBrokerReader.<init>(ZkBrokerReader.java:24) 
>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> storm.artemis.kafka.KafkaUtils.makeBrokerReader(KafkaUtils.java:40) 
>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> storm.artemis.kafka.trident.TridentKafkaEmitter.<init>(TridentKafkaEmitter.java:44)
>>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> storm.artemis.kafka.trident.OpaqueTridentKafkaSpout.getEmitter(OpaqueTridentKafkaSpout.java:24)
>>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.<init>(OpaquePartitionedTridentSpoutExecutor.java:69)
>>  ~[storm-core-0.9.0.1.jar:na]
>>         at 
>> storm.trident.spout.OpaquePartitionedTridentSpoutExecutor.getEmitter(OpaquePartitionedTridentSpoutExecutor.java:171)
>>  ~[storm-core-0.9.0.1.jar:na]
>>         at 
>> storm.trident.spout.OpaquePartitionedTridentSpoutExecutor.getEmitter(OpaquePartitionedTridentSpoutExecutor.java:20)
>>  ~[storm-core-0.9.0.1.jar:na]
>>         at 
>> storm.trident.spout.TridentSpoutExecutor.prepare(TridentSpoutExecutor.java:43)
>>  ~[storm-core-0.9.0.1.jar:na]
>>         at 
>> storm.trident.topology.TridentBoltExecutor.prepare(TridentBoltExecutor.java:214)
>>  ~[storm-core-0.9.0.1.jar:na]
>>         at 
>> backtype.storm.daemon.executor$fn__3498$fn__3510.invoke(executor.clj:674) 
>> ~[storm-core-0.9.0.1.jar:na]
>>         at backtype.storm.util$async_loop$fn__444.invoke(util.clj:401) 
>> ~[storm-core-0.9.0.1.jar:na]
>>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
>>         at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]
>> 3059 [Thread-19-$spoutcoord-spout0] ERROR backtype.storm.daemon.executor -
>> java.lang.NoSuchMethodError: 
>> org.apache.zookeeper.ZooKeeper.<init>(Ljava/lang/String;ILorg/apache/zookeeper/Watcher;Z)V
>>         at 
>> org.apache.curator.utils.DefaultZookeeperFactory.newZooKeeper(DefaultZookeeperFactory.java:29)
>>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> org.apache.curator.framework.imps.CuratorFrameworkImpl$2.newZooKeeper(CuratorFrameworkImpl.java:154)
>>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> org.apache.curator.HandleHolder$1.getZooKeeper(HandleHolder.java:94) 
>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> org.apache.curator.HandleHolder.getZooKeeper(HandleHolder.java:55) 
>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> org.apache.curator.ConnectionState.reset(ConnectionState.java:219) 
>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> org.apache.curator.ConnectionState.start(ConnectionState.java:103) 
>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> org.apache.curator.CuratorZookeeperClient.start(CuratorZookeeperClient.java:190)
>>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> org.apache.curator.framework.imps.CuratorFrameworkImpl.start(CuratorFrameworkImpl.java:256)
>>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> storm.artemis.kafka.DynamicBrokersReader.<init>(DynamicBrokersReader.java:36)
>>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> storm.artemis.kafka.trident.ZkBrokerReader.<init>(ZkBrokerReader.java:24) 
>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> storm.artemis.kafka.KafkaUtils.makeBrokerReader(KafkaUtils.java:40) 
>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> storm.artemis.kafka.trident.Coordinator.<init>(Coordinator.java:16) 
>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> storm.artemis.kafka.trident.OpaqueTridentKafkaSpout.getCoordinator(OpaqueTridentKafkaSpout.java:29)
>>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Coordinator.<init>(OpaquePartitionedTridentSpoutExecutor.java:27)
>>  ~[storm-core-0.9.0.1.jar:na]
>>         at 
>> storm.trident.spout.OpaquePartitionedTridentSpoutExecutor.getCoordinator(OpaquePartitionedTridentSpoutExecutor.java:166)
>>  ~[storm-core-0.9.0.1.jar:na]
>>         at 
>> storm.trident.spout.TridentSpoutCoordinator.prepare(TridentSpoutCoordinator.java:38)
>>  ~[storm-core-0.9.0.1.jar:na]
>>         at 
>> backtype.storm.topology.BasicBoltExecutor.prepare(BasicBoltExecutor.java:26) 
>> ~[storm-core-0.9.0.1.jar:na]
>>         at 
>> backtype.storm.daemon.executor$fn__3498$fn__3510.invoke(executor.clj:674) 
>> ~[storm-core-0.9.0.1.jar:na]
>>         at backtype.storm.util$async_loop$fn__444.invoke(util.clj:401) 
>> ~[storm-core-0.9.0.1.jar:na]
>>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
>>         at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]
>> 3059 [Thread-25-spout0] ERROR backtype.storm.daemon.executor -
>> java.lang.NoSuchMethodError: 
>> org.apache.zookeeper.ZooKeeper.<init>(Ljava/lang/String;ILorg/apache/zookeeper/Watcher;Z)V
>>         at 
>> org.apache.curator.utils.DefaultZookeeperFactory.newZooKeeper(DefaultZookeeperFactory.java:29)
>>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> org.apache.curator.framework.imps.CuratorFrameworkImpl$2.newZooKeeper(CuratorFrameworkImpl.java:154)
>>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> org.apache.curator.HandleHolder$1.getZooKeeper(HandleHolder.java:94) 
>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> org.apache.curator.HandleHolder.getZooKeeper(HandleHolder.java:55) 
>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> org.apache.curator.ConnectionState.reset(ConnectionState.java:219) 
>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> org.apache.curator.ConnectionState.start(ConnectionState.java:103) 
>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> org.apache.curator.CuratorZookeeperClient.start(CuratorZookeeperClient.java:190)
>>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> org.apache.curator.framework.imps.CuratorFrameworkImpl.start(CuratorFrameworkImpl.java:256)
>>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> storm.artemis.kafka.DynamicBrokersReader.<init>(DynamicBrokersReader.java:36)
>>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> storm.artemis.kafka.trident.ZkBrokerReader.<init>(ZkBrokerReader.java:24) 
>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> storm.artemis.kafka.KafkaUtils.makeBrokerReader(KafkaUtils.java:40) 
>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> storm.artemis.kafka.trident.TridentKafkaEmitter.<init>(TridentKafkaEmitter.java:44)
>>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> storm.artemis.kafka.trident.OpaqueTridentKafkaSpout.getEmitter(OpaqueTridentKafkaSpout.java:24)
>>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>         at 
>> storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.<init>(OpaquePartitionedTridentSpoutExecutor.java:69)
>>  ~[storm-core-0.9.0.1.jar:na]
>>         at 
>> storm.trident.spout.OpaquePartitionedTridentSpoutExecutor.getEmitter(OpaquePartitionedTridentSpoutExecutor.java:171)
>>  ~[storm-core-0.9.0.1.jar:na]
>>         at 
>> storm.trident.spout.OpaquePartitionedTridentSpoutExecutor.getEmitter(OpaquePartitionedTridentSpoutExecutor.java:20)
>>  ~[storm-core-0.9.0.1.jar:na]
>>         at 
>> storm.trident.spout.TridentSpoutExecutor.prepare(TridentSpoutExecutor.java:43)
>>  ~[storm-core-0.9.0.1.jar:na]
>>         at 
>> storm.trident.topology.TridentBoltExecutor.prepare(TridentBoltExecutor.java:214)
>>  ~[storm-core-0.9.0.1.jar:na]
>>         at 
>> backtype.storm.daemon.executor$fn__3498$fn__3510.invoke(executor.clj:674) 
>> ~[storm-core-0.9.0.1.jar:na]
>>         at backtype.storm.util$async_loop$fn__444.invoke(util.clj:401) 
>> ~[storm-core-0.9.0.1.jar:na]
>>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
>>         at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]
>> 3059 [Thread-7] INFO  backtype.storm.daemon.worker - Worker has topology 
>> config {"storm.id" "kafka-1-1407268492", "dev.zookeeper.path" 
>> "/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil, 
>> "topology.builtin.metrics.bucket.size.secs" 60, 
>> "topology.fall.back.on.java.serialization" true, 
>> "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, 
>> "topology.skip.missing.kryo.registrations" true, 
>> "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", 
>> "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, 
>> "topology.trident.batch.emit.interval.millis" 50, "nimbus.monitor.freq.secs" 
>> 10, "logviewer.childopts" "-Xmx128m", "java.library.path" 
>> "/usr/lib/jvm/java-7-openjdk-amd64", "topology.executor.send.buffer.size" 
>> 1024, "storm.local.dir" "/tmp/ca948198-69df-440b-8acb-6dfc4db6c288", 
>> "storm.messaging.netty.buffer_size" 5242880, 
>> "supervisor.worker.start.timeout.secs" 120, 
>> "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 
>> 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, 
>> "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "10.100.70.128", 
>> "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2000, 
>> "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 
>> 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" 
>> "/storm", "storm.zookeeper.retry.intervalceiling.millis" 30000, 
>> "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, 
>> "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" 
>> "/transactional", "topology.acker.executors" nil, "topology.kryo.decorators" 
>> (), "topology.name" "kafka", "topology.transfer.buffer.size" 1024, 
>> "topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts" 
>> "-Xmx768m", "supervisor.heartbeat.frequency.secs" 5, 
>> "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, 
>> "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", 
>> "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, 
>> "topology.tasks" nil, "storm.messaging.netty.max_retries" 30, 
>> "topology.spout.wait.strategy" 
>> "backtype.storm.spout.SleepSpoutWaitStrategy", "topology.max.spout.pending" 
>> nil, "storm.zookeeper.retry.interval" 1000, 
>> "topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" 
>> "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" 
>> (1 2 3), "topology.debug" true, "nimbus.task.launch.secs" 120, 
>> "nimbus.supervisor.timeout.secs" 60, "topology.kryo.register" 
>> {"storm.trident.topology.TransactionAttempt" nil}, 
>> "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, 
>> "topology.workers" 1, "supervisor.childopts" "-Xmx256m", 
>> "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, 
>> "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" 
>> "backtype.storm.serialization.types.ListDelegateSerializer", 
>> "topology.disruptor.wait.strategy" 
>> "com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs" 30, 
>> "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" 
>> "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 
>> 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times" 
>> 5, "storm.thrift.transport" 
>> "backtype.storm.security.auth.SimpleTransportPlugin", 
>> "topology.state.synchronization.timeout.secs" 60, 
>> "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, 
>> "storm.messaging.transport" "backtype.storm.messaging.zmq", 
>> "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, 
>> "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 
>> 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", 
>> "topology.optimize" true, "topology.max.task.parallelism" nil}
>> 3059 [Thread-7] INFO  backtype.storm.daemon.worker - Worker 
>> 64335058-7f94-447f-bc0a-5107084789a0 for storm kafka-1-1407268492 on 
>> cf2964b3-7655-4a33-88a1-f6e0ceb6f9ed:1 has finished loading
>> 3164 [Thread-29-$mastercoord-bg0] INFO  
>> com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
>> 3173 [Thread-25-spout0] INFO  backtype.storm.util - Halting process: 
>> ("Worker died")
>> 3173 [Thread-19-$spoutcoord-spout0] INFO  backtype.storm.util - Halting 
>> process: ("Worker died")
>> 
>> Thanks
>> 
>> Alec
>> 
>> On Aug 5, 2014, at 10:26 AM, P. Taylor Goetz <ptgo...@gmail.com> wrote:
>> 
>>> You are only sleeping for 100 milliseconds before shutting down the local 
>>> cluster, which is probably not long enough for the topology to come up and 
>>> start processing messages. Try increasing the sleep time to something like 
>>> 10 seconds.
>>> 
>>> You can also reduce startup time with the following JVM flag:
>>> 
>>> -Djava.net.preferIPv4Stack=true
>>> 
>>> - Taylor
>>> 
>>> On Aug 5, 2014, at 1:16 PM, Sa Li <sa.in.v...@gmail.com> wrote:
>>> 
>>>> Sorry, the stormTopology:
>>>> 
>>>>>                         TridentTopology topology = new TridentTopology();
>>>>>                   BrokerHosts zk = new ZkHosts("localhost");
>>>>>                   TridentKafkaConfig spoutConf = new 
>>>>> TridentKafkaConfig(zk, “topictest");
>>>>>                   spoutConf.scheme = new SchemeAsMultiScheme(new 
>>>>> StringScheme());
>>>>>                   OpaqueTridentKafkaSpout spout = new 
>>>>> OpaqueTridentKafkaSpout(spoutConf);
>>>> 
>>>> 
>>>> 
>>>> 
>>>> On Aug 5, 2014, at 9:56 AM, Sa Li <sa.in.v...@gmail.com> wrote:
>>>> 
>>>>> Thank you very much, Marcelo, it indeed worked, now I can run my code 
>>>>> without getting error. However, another thing is keeping bother me, 
>>>>> following is my code:
>>>>> 
>>>>> public static class PrintStream implements Filter {
>>>>> 
>>>>>                   @SuppressWarnings("rawtypes”)
>>>>>                   @Override
>>>>>                   public void prepare(Map conf, TridentOperationContext 
>>>>> context) {
>>>>>                   }
>>>>>                   @Override
>>>>>                   public void cleanup() {
>>>>>                   }
>>>>>                   @Override
>>>>>                   public boolean isKeep(TridentTuple tuple) {
>>>>>                           System.out.println(tuple);
>>>>>                           return true;
>>>>>                   }
>>>>> }
>>>>> public static StormTopology buildTopology(LocalDRPC drpc) throws 
>>>>> IOException {
>>>>>              
>>>>>                   TridentTopology topology = new TridentTopology();
>>>>>                   BrokerHosts zk = new ZkHosts("localhost");
>>>>>                   TridentKafkaConfig spoutConf = new 
>>>>> TridentKafkaConfig(zk, "ingest_test");
>>>>>                   spoutConf.scheme = new SchemeAsMultiScheme(new 
>>>>> StringScheme());
>>>>>                   OpaqueTridentKafkaSpout spout = new 
>>>>> OpaqueTridentKafkaSpout(spoutConf);
>>>>>                             
>>>>>                   topology.newStream("kafka", spout)  
>>>>>                                          .each(new Fields("str"),
>>>>>                                          new PrintStream()                
>>>>>      
>>>>>                   );
>>>>>                  
>>>>>                  return topology.build();
>>>>> }
>>>>> public static void main(String[] args) throws Exception {
>>>>>            
>>>>>               Config conf = new Config();
>>>>>               conf.setDebug(true);
>>>>>                     conf.setMaxSpoutPending(1);
>>>>>               conf.setMaxTaskParallelism(3);
>>>>>               LocalDRPC drpc = new LocalDRPC();
>>>>>               LocalCluster cluster = new LocalCluster();
>>>>>               cluster.submitTopology("kafka", conf, buildTopology(drpc)); 
>>>>>     
>>>>>               Thread.sleep(100);
>>>>>               cluster.shutdown();   
>>>>> } 
>>>>> 
>>>>> What I expect is quite simple, print out the message I collect from a 
>>>>> kafka producer playback process which is running separately. The topic is 
>>>>> listed as:
>>>>> 
>>>>> root@DO-mq-dev:/etc/kafka# bin/kafka-list-topic.sh --zookeeper 
>>>>> localhost:2181
>>>>> topic: topictest        partition: 0    leader: 1       replicas: 1,3,2 
>>>>> isr: 1,3,2
>>>>> topic: topictest        partition: 1    leader: 2       replicas: 2,1,3 
>>>>> isr: 2,1,3
>>>>> topic: topictest        partition: 2    leader: 3       replicas: 3,2,1 
>>>>> isr: 3,2,1
>>>>> topic: topictest        partition: 3    leader: 1       replicas: 1,2,3 
>>>>> isr: 1,2,3
>>>>> topic: topictest        partition: 4    leader: 2       replicas: 2,3,1 
>>>>> isr: 2,3,1
>>>>> 
>>>>> When I am running the code, this is what I saw on the screen, seems no 
>>>>> error, but no message print out as well:
>>>>> 
>>>>> SLF4J: Class path contains multiple SLF4J bindings.
>>>>> SLF4J: Found binding in 
>>>>> [jar:file:/etc/storm-0.9.0.1/lib/logback-classic-1.0.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>>> SLF4J: Found binding in 
>>>>> [jar:file:/home/stuser/kafkaprj/kafka-storm-bitmap/target/kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
>>>>> explanation.
>>>>> Running: java -client -Dstorm.options= -Dstorm.home=/etc/storm-0.9.0.1 
>>>>> -Djava.library.path=/usr/lib/jvm/java-7-openjdk-amd64 -Dstorm.conf.file= 
>>>>> -cp 
>>>>> /etc/storm-0.9.0.1/storm-netty-0.9.0.1.jar:/etc/storm-0.9.0.1/storm-core-0.9.0.1.jar:/etc/storm-0.9.0.1/storm-console-logging-0.9.0.1.jar:/etc/storm-0.9.0.1/lib/log4j-over-slf4j-1.6.6.jar:/etc/storm-0.9.0.1/lib/commons-io-1.4.jar:/etc/storm-0.9.0.1/lib/joda-time-2.0.jar:/etc/storm-0.9.0.1/lib/tools.nrepl-0.2.3.jar:/etc/storm-0.9.0.1/lib/servlet-api-2.5.jar:/etc/storm-0.9.0.1/lib/curator-framework-1.0.1.jar:/etc/storm-0.9.0.1/lib/core.incubator-0.1.0.jar:/etc/storm-0.9.0.1/lib/jetty-6.1.26.jar:/etc/storm-0.9.0.1/lib/commons-codec-1.4.jar:/etc/storm-0.9.0.1/lib/servlet-api-2.5-20081211.jar:/etc/storm-0.9.0.1/lib/httpclient-4.1.1.jar:/etc/storm-0.9.0.1/lib/commons-exec-1.1.jar:/etc/storm-0.9.0.1/lib/logback-classic-1.0.6.jar:/etc/storm-0.9.0.1/lib/libthrift7-0.7.0-2.jar:/etc/storm-0.9.0.1/lib/minlog-1.2.jar:/etc/storm-0.9.0.1/lib/clojure-complete-0.2.3.jar:/etc/storm-0.9.0.1/lib/clojure-1.4.0.jar:/etc/storm-0.9.0.1/lib/asm-4.0.jar:/etc/storm-0.9.0.1/lib/mockito-all-1.9.5.jar:/etc/storm-0.9.0.1/lib/commons-fileupload-1.2.1.jar:/etc/storm-0.9.0.1/lib/clout-1.0.1.jar:/etc/storm-0.9.0.1/lib/ring-servlet-0.3.11.jar:/etc/storm-0.9.0.1/lib/ring-devel-0.3.11.jar:/etc/storm-0.9.0.1/lib/jgrapht-0.8.3.jar:/etc/storm-0.9.0.1/lib/snakeyaml-1.11.jar:/etc/storm-0.9.0.1/lib/reflectasm-1.07-shaded.jar:/etc/storm-0.9.0.1/lib/kryo-2.17.jar:/etc/storm-0.9.0.1/lib/ring-jetty-adapter-0.3.11.jar:/etc/storm-0.9.0.1/lib/compojure-1.1.3.jar:/etc/storm-0.9.0.1/lib/objenesis-1.2.jar:/etc/storm-0.9.0.1/lib/commons-logging-1.1.1.jar:/etc/storm-0.9.0.1/lib/tools.macro-0.1.0.jar:/etc/storm-0.9.0.1/lib/junit-3.8.1.jar:/etc/storm-0.9.0.1/lib/json-simple-1.1.jar:/etc/storm-0.9.0.1/lib/tools.cli-0.2.2.jar:/etc/storm-0.9.0.1/lib/curator-client-1.0.1.jar:/etc/storm-0.9.0.1/lib/jline-0.9.94.jar:/etc/storm-0.9.0.1/lib/zookeeper-3.3.3.jar:/etc/storm-0.9.0.1/lib/guava-13.0.jar:/etc/storm-0.9.0.1/lib/commons-lang-2.5.jar:/etc/storm-0.9.0.1/lib/carbonite-1.5.0.jar:/etc/storm-0.9.0.1/lib/ring-core-1.1.5.jar:/etc/storm-0.9.0.1/lib/jzmq-2.1.0.jar:/etc/storm-0.9.0.1/lib/hiccup-0.3.6.jar:/etc/storm-0.9.0.1/lib/tools.logging-0.2.3.jar:/etc/storm-0.9.0.1/lib/kafka_2.9.2-0.8.0.jar:/etc/storm-0.9.0.1/lib/clj-stacktrace-0.2.2.jar:/etc/storm-0.9.0.1/lib/math.numeric-tower-0.0.1.jar:/etc/storm-0.9.0.1/lib/slf4j-api-1.6.5.jar:/etc/storm-0.9.0.1/lib/netty-3.6.3.Final.jar:/etc/storm-0.9.0.1/lib/disruptor-2.10.1.jar:/etc/storm-0.9.0.1/lib/jetty-util-6.1.26.jar:/etc/storm-0.9.0.1/lib/httpcore-4.1.jar:/etc/storm-0.9.0.1/lib/logback-core-1.0.6.jar:/etc/storm-0.9.0.1/lib/clj-time-0.4.1.jar:target/kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:/etc/storm-0.9.0.1/conf:/etc/storm-0.9.0.1/bin
>>>>>  
>>>>> -Dstorm.jar=target/kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar
>>>>>  storm.artemis.KafkaConsumerTopology KafkaConsumerTopology
>>>>> SLF4J: Class path contains multiple SLF4J bindings.
>>>>> SLF4J: Found binding in 
>>>>> [jar:file:/etc/storm-0.9.0.1/lib/logback-classic-1.0.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>>> SLF4J: Found binding in 
>>>>> [jar:file:/home/stuser/kafkaprj/kafka-storm-bitmap/target/kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
>>>>> explanation.
>>>>> 1113 [main] INFO  backtype.storm.zookeeper - Starting inprocess zookeeper 
>>>>> at port 2000 and dir /tmp/dd37d0cc-79b3-4f23-b6a5-3bcf5a9f0879
>>>>> 1216 [main] INFO  backtype.storm.daemon.nimbus - Starting Nimbus with 
>>>>> conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", 
>>>>> "topology.tick.tuple.freq.secs" nil, 
>>>>> "topology.builtin.metrics.bucket.size.secs" 60, 
>>>>> "topology.fall.back.on.java.serialization" true, 
>>>>> "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, 
>>>>> "topology.skip.missing.kryo.registrations" true, 
>>>>> "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" 
>>>>> "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" 
>>>>> true, "topology.trident.batch.emit.interval.millis" 50, 
>>>>> "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", 
>>>>> "java.library.path" "/usr/lib/jvm/java-7-openjdk-amd64", 
>>>>> "topology.executor.send.buffer.size" 1024, "storm.local.dir" 
>>>>> "/tmp/cf44f174-2cda-4e67-8c85-e9f96897fcd9", 
>>>>> "storm.messaging.netty.buffer_size" 5242880, 
>>>>> "supervisor.worker.start.timeout.secs" 120, 
>>>>> "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 
>>>>> 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, 
>>>>> "topology.worker.shared.thread.pool.size" 4, "nimbus.host" 
>>>>> "10.100.70.128", "storm.messaging.netty.min_wait_ms" 100, 
>>>>> "storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil, 
>>>>> "topology.executor.receive.buffer.size" 1024, 
>>>>> "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", 
>>>>> "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" 
>>>>> true, "storm.messaging.netty.server_worker_threads" 1, 
>>>>> "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" 
>>>>> "/transactional", "topology.acker.executors" nil, 
>>>>> "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, 
>>>>> "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", 
>>>>> "supervisor.heartbeat.frequency.secs" 5, 
>>>>> "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 
>>>>> 3772, "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", 
>>>>> "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, 
>>>>> "topology.tasks" nil, "storm.messaging.netty.max_retries" 30, 
>>>>> "topology.spout.wait.strategy" 
>>>>> "backtype.storm.spout.SleepSpoutWaitStrategy", 
>>>>> "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, 
>>>>> "topology.sleep.spout.wait.strategy.time.ms" 1, 
>>>>> "nimbus.topology.validator" 
>>>>> "backtype.storm.nimbus.DefaultTopologyValidator", 
>>>>> "supervisor.slots.ports" [6700 6701 6702 6703], "topology.debug" false, 
>>>>> "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, 
>>>>> "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, 
>>>>> "topology.workers" 1, "supervisor.childopts" "-Xmx256m", 
>>>>> "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, 
>>>>> "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" 
>>>>> "backtype.storm.serialization.types.ListDelegateSerializer", 
>>>>> "topology.disruptor.wait.strategy" 
>>>>> "com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs" 30, 
>>>>> "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" 
>>>>> "backtype.storm.serialization.DefaultKryoFactory", 
>>>>> "drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, 
>>>>> "storm.zookeeper.retry.times" 5, "storm.thrift.transport" 
>>>>> "backtype.storm.security.auth.SimpleTransportPlugin", 
>>>>> "topology.state.synchronization.timeout.secs" 60, 
>>>>> "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 
>>>>> 600, "storm.messaging.transport" "backtype.storm.messaging.zmq", 
>>>>> "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, 
>>>>> "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 
>>>>> 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", 
>>>>> "topology.optimize" true, "topology.max.task.parallelism" nil}
>>>>> 1219 [main] INFO  backtype.storm.daemon.nimbus - Using default scheduler
>>>>> 1237 [main] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl 
>>>>> - Starting
>>>>> 1303 [main-EventThread] INFO  backtype.storm.zookeeper - Zookeeper state 
>>>>> update: :connected:none
>>>>> 1350 [main] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl 
>>>>> - Starting
>>>>> 1417 [main] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl 
>>>>> - Starting
>>>>> 1432 [main-EventThread] INFO  backtype.storm.zookeeper - Zookeeper state 
>>>>> update: :connected:none
>>>>> 1482 [main] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl 
>>>>> - Starting
>>>>> 1484 [main] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl 
>>>>> - Starting
>>>>> 1532 [main-EventThread] INFO  backtype.storm.zookeeper - Zookeeper state 
>>>>> update: :connected:none
>>>>> 1540 [main] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl 
>>>>> - Starting
>>>>> 1568 [main] INFO  backtype.storm.daemon.supervisor - Starting Supervisor 
>>>>> with conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", 
>>>>> "topology.tick.tuple.freq.secs" nil, 
>>>>> "topology.builtin.metrics.bucket.size.secs" 60, 
>>>>> "topology.fall.back.on.java.serialization" true, 
>>>>> "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, 
>>>>> "topology.skip.missing.kryo.registrations" true, 
>>>>> "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" 
>>>>> "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" 
>>>>> true, "topology.trident.batch.emit.interval.millis" 50, 
>>>>> "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", 
>>>>> "java.library.path" "/usr/lib/jvm/java-7-openjdk-amd64", 
>>>>> "topology.executor.send.buffer.size" 1024, "storm.local.dir" 
>>>>> "/tmp/3e515769-ebf5-4085-a6bf-35f4ad8be388", 
>>>>> "storm.messaging.netty.buffer_size" 5242880, 
>>>>> "supervisor.worker.start.timeout.secs" 120, 
>>>>> "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 
>>>>> 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, 
>>>>> "topology.worker.shared.thread.pool.size" 4, "nimbus.host" 
>>>>> "10.100.70.128", "storm.messaging.netty.min_wait_ms" 100, 
>>>>> "storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil, 
>>>>> "topology.executor.receive.buffer.size" 1024, 
>>>>> "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", 
>>>>> "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" 
>>>>> true, "storm.messaging.netty.server_worker_threads" 1, 
>>>>> "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" 
>>>>> "/transactional", "topology.acker.executors" nil, 
>>>>> "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, 
>>>>> "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", 
>>>>> "supervisor.heartbeat.frequency.secs" 5, 
>>>>> "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 
>>>>> 3772, "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", 
>>>>> "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, 
>>>>> "topology.tasks" nil, "storm.messaging.netty.max_retries" 30, 
>>>>> "topology.spout.wait.strategy" 
>>>>> "backtype.storm.spout.SleepSpoutWaitStrategy", 
>>>>> "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, 
>>>>> "topology.sleep.spout.wait.strategy.time.ms" 1, 
>>>>> "nimbus.topology.validator" 
>>>>> "backtype.storm.nimbus.DefaultTopologyValidator", 
>>>>> "supervisor.slots.ports" (1 2 3), "topology.debug" false, 
>>>>> "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, 
>>>>> "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, 
>>>>> "topology.workers" 1, "supervisor.childopts" "-Xmx256m", 
>>>>> "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, 
>>>>> "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" 
>>>>> "backtype.storm.serialization.types.ListDelegateSerializer", 
>>>>> "topology.disruptor.wait.strategy" 
>>>>> "com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs" 30, 
>>>>> "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" 
>>>>> "backtype.storm.serialization.DefaultKryoFactory", 
>>>>> "drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, 
>>>>> "storm.zookeeper.retry.times" 5, "storm.thrift.transport" 
>>>>> "backtype.storm.security.auth.SimpleTransportPlugin", 
>>>>> "topology.state.synchronization.timeout.secs" 60, 
>>>>> "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 
>>>>> 600, "storm.messaging.transport" "backtype.storm.messaging.zmq", 
>>>>> "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, 
>>>>> "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 
>>>>> 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", 
>>>>> "topology.optimize" true, "topology.max.task.parallelism" nil}
>>>>> 1576 [main] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl 
>>>>> - Starting
>>>>> 1582 [main-EventThread] INFO  backtype.storm.zookeeper - Zookeeper state 
>>>>> update: :connected:none
>>>>> 1590 [main] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl 
>>>>> - Starting
>>>>> 1632 [main] INFO  backtype.storm.daemon.supervisor - Starting supervisor 
>>>>> with id 944e6152-ca58-4d2b-8325-94ac98f43995 at host DO-mq-dev
>>>>> 1636 [main] INFO  backtype.storm.daemon.supervisor - Starting Supervisor 
>>>>> with conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", 
>>>>> "topology.tick.tuple.freq.secs" nil, 
>>>>> "topology.builtin.metrics.bucket.size.secs" 60, 
>>>>> "topology.fall.back.on.java.serialization" true, 
>>>>> "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, 
>>>>> "topology.skip.missing.kryo.registrations" true, 
>>>>> "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" 
>>>>> "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" 
>>>>> true, "topology.trident.batch.emit.interval.millis" 50, 
>>>>> "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", 
>>>>> "java.library.path" "/usr/lib/jvm/java-7-openjdk-amd64", 
>>>>> "topology.executor.send.buffer.size" 1024, "storm.local.dir" 
>>>>> "/tmp/d0aeb5f4-0830-4efd-be7f-bc40d5b66912", 
>>>>> "storm.messaging.netty.buffer_size" 5242880, 
>>>>> "supervisor.worker.start.timeout.secs" 120, 
>>>>> "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 
>>>>> 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, 
>>>>> "topology.worker.shared.thread.pool.size" 4, "nimbus.host" 
>>>>> "10.100.70.128", "storm.messaging.netty.min_wait_ms" 100, 
>>>>> "storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil, 
>>>>> "topology.executor.receive.buffer.size" 1024, 
>>>>> "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", 
>>>>> "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" 
>>>>> true, "storm.messaging.netty.server_worker_threads" 1, 
>>>>> "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" 
>>>>> "/transactional", "topology.acker.executors" nil, 
>>>>> "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, 
>>>>> "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", 
>>>>> "supervisor.heartbeat.frequency.secs" 5, 
>>>>> "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 
>>>>> 3772, "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", 
>>>>> "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, 
>>>>> "topology.tasks" nil, "storm.messaging.netty.max_retries" 30, 
>>>>> "topology.spout.wait.strategy" 
>>>>> "backtype.storm.spout.SleepSpoutWaitStrategy", 
>>>>> "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, 
>>>>> "topology.sleep.spout.wait.strategy.time.ms" 1, 
>>>>> "nimbus.topology.validator" 
>>>>> "backtype.storm.nimbus.DefaultTopologyValidator", 
>>>>> "supervisor.slots.ports" (4 5 6), "topology.debug" false, 
>>>>> "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, 
>>>>> "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, 
>>>>> "topology.workers" 1, "supervisor.childopts" "-Xmx256m", 
>>>>> "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, 
>>>>> "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" 
>>>>> "backtype.storm.serialization.types.ListDelegateSerializer", 
>>>>> "topology.disruptor.wait.strategy" 
>>>>> "com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs" 30, 
>>>>> "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" 
>>>>> "backtype.storm.serialization.DefaultKryoFactory", 
>>>>> "drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, 
>>>>> "storm.zookeeper.retry.times" 5, "storm.thrift.transport" 
>>>>> "backtype.storm.security.auth.SimpleTransportPlugin", 
>>>>> "topology.state.synchronization.timeout.secs" 60, 
>>>>> "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 
>>>>> 600, "storm.messaging.transport" "backtype.storm.messaging.zmq", 
>>>>> "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, 
>>>>> "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 
>>>>> 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", 
>>>>> "topology.optimize" true, "topology.max.task.parallelism" nil}
>>>>> 1638 [main] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl 
>>>>> - Starting
>>>>> 1648 [main-EventThread] INFO  backtype.storm.zookeeper - Zookeeper state 
>>>>> update: :connected:none
>>>>> 1690 [main] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl 
>>>>> - Starting
>>>>> 1740 [main] INFO  backtype.storm.daemon.supervisor - Starting supervisor 
>>>>> with id e8303ca7-9cc4-4551-8387-7559fc3c53fc at host DO-mq-dev
>>>>> 1944 [main] INFO  backtype.storm.daemon.nimbus - Received topology 
>>>>> submission for kafka with conf {"topology.max.task.parallelism" nil, 
>>>>> "topology.acker.executors" nil, "topology.kryo.register" 
>>>>> {"storm.trident.topology.TransactionAttempt" nil}, 
>>>>> "topology.kryo.decorators" (), "topology.name" "kafka", "storm.id" 
>>>>> "kafka-1-1407257070", "topology.debug" true}
>>>>> 1962 [main] INFO  backtype.storm.daemon.nimbus - Activating kafka: 
>>>>> kafka-1-1407257070
>>>>> 2067 [main] INFO  backtype.storm.scheduler.EvenScheduler - Available 
>>>>> slots: (["944e6152-ca58-4d2b-8325-94ac98f43995" 1] 
>>>>> ["944e6152-ca58-4d2b-8325-94ac98f43995" 2] 
>>>>> ["944e6152-ca58-4d2b-8325-94ac98f43995" 3] 
>>>>> ["e8303ca7-9cc4-4551-8387-7559fc3c53fc" 4] 
>>>>> ["e8303ca7-9cc4-4551-8387-7559fc3c53fc" 5] 
>>>>> ["e8303ca7-9cc4-4551-8387-7559fc3c53fc" 6])
>>>>> 2088 [main] INFO  backtype.storm.daemon.nimbus - Setting new assignment 
>>>>> for topology id kafka-1-1407257070: 
>>>>> #backtype.storm.daemon.common.Assignment{:master-code-dir 
>>>>> "/tmp/cf44f174-2cda-4e67-8c85-e9f96897fcd9/nimbus/stormdist/kafka-1-1407257070",
>>>>>  :node->host {"944e6152-ca58-4d2b-8325-94ac98f43995" "DO-mq-dev"}, 
>>>>> :executor->node+port {[3 3] ["944e6152-ca58-4d2b-8325-94ac98f43995" 1], 
>>>>> [5 5] ["944e6152-ca58-4d2b-8325-94ac98f43995" 1], [4 4] 
>>>>> ["944e6152-ca58-4d2b-8325-94ac98f43995" 1], [2 2] 
>>>>> ["944e6152-ca58-4d2b-8325-94ac98f43995" 1], [1 1] 
>>>>> ["944e6152-ca58-4d2b-8325-94ac98f43995" 1]}, :executor->start-time-secs 
>>>>> {[1 1] 1407257070, [2 2] 1407257070, [4 4] 1407257070, [5 5] 1407257070, 
>>>>> [3 3] 1407257070}}
>>>>> 2215 [main] INFO  backtype.storm.daemon.nimbus - Shutting down master
>>>>> 2223 [main] INFO  backtype.storm.daemon.nimbus - Shut down master
>>>>> 2239 [main] INFO  backtype.storm.daemon.supervisor - Shutting down 
>>>>> supervisor 944e6152-ca58-4d2b-8325-94ac98f43995
>>>>> 2240 [Thread-6] INFO  backtype.storm.event - Event manager interrupted
>>>>> 2241 [Thread-7] INFO  backtype.storm.event - Event manager interrupted
>>>>> 2248 [main] INFO  backtype.storm.daemon.supervisor - Shutting down 
>>>>> supervisor e8303ca7-9cc4-4551-8387-7559fc3c53fc
>>>>> 2248 [Thread-9] INFO  backtype.storm.event - Event manager interrupted
>>>>> 2248 [Thread-10] INFO  backtype.storm.event - Event manager interrupted
>>>>> 2256 [main] INFO  backtype.storm.testing - Shutting down in process 
>>>>> zookeeper
>>>>> 2257 [main] INFO  backtype.storm.testing - Done shutting down in process 
>>>>> zookeeper
>>>>> 2258 [main] INFO  backtype.storm.testing - Deleting temporary path 
>>>>> /tmp/cf44f174-2cda-4e67-8c85-e9f96897fcd9
>>>>> 2259 [main] INFO  backtype.storm.testing - Deleting temporary path 
>>>>> /tmp/dd37d0cc-79b3-4f23-b6a5-3bcf5a9f0879
>>>>> 2260 [main] INFO  backtype.storm.testing - Deleting temporary path 
>>>>> /tmp/3e515769-ebf5-4085-a6bf-35f4ad8be388
>>>>> 2261 [main] INFO  backtype.storm.testing - Deleting temporary path 
>>>>> /tmp/d0aeb5f4-0830-4efd-be7f-bc40d5b66912
>>>>> 
>>>>> Anyone can help me locate what the problem is? I really need to walk 
>>>>> through this step in order to be able to replace .each(printStream()) 
>>>>> with other functions.
>>>>> 
>>>>> 
>>>>> Thanks
>>>>> 
>>>>> Alec
>>>>>  
>>>>> On Aug 4, 2014, at 4:24 AM, Marcelo Valle <mva...@redoop.org> wrote:
>>>>> 
>>>>>> hello,
>>>>>> 
>>>>>> you can check your .jar application with command " jar tf " to see if 
>>>>>> class kafka/api/OffsetRequest.class is part of the jar.
>>>>>> If not you can try to copy kafka-2.9.2-0.8.0.jar (or version you are 
>>>>>> using) in storm_lib directory
>>>>>> 
>>>>>> Marcelo
>>>>>> 
>>>>>> 
>>>>>> 2014-07-31 23:33 GMT+02:00 Sa Li <sa.in.v...@gmail.com>:
>>>>>> Hi, all
>>>>>> 
>>>>>> I am running a kafka-spout code in storm-server, the pom is 
>>>>>> 
>>>>>>   <groupId>org.apache.kafka</groupId>
>>>>>>         <artifactId>kafka_2.9.2</artifactId>
>>>>>>         <version>0.8.0</version>
>>>>>>                 <scope>provided</scope>
>>>>>> 
>>>>>>                 <exclusions>
>>>>>>                   <exclusion>
>>>>>>                        <groupId>org.apache.zookeeper</groupId>
>>>>>>                        <artifactId>zookeeper</artifactId>
>>>>>>                   </exclusion>
>>>>>>                   <exclusion>
>>>>>>                        <groupId>log4j</groupId>
>>>>>>                        <artifactId>log4j</artifactId>
>>>>>>                   </exclusion>
>>>>>>                 </exclusions>
>>>>>> 
>>>>>>     </dependency>
>>>>>> 
>>>>>>             <!-- Storm-Kafka compiled -->
>>>>>> 
>>>>>>         <dependency>
>>>>>>             <artifactId>storm-kafka</artifactId>
>>>>>>             <groupId>org.apache.storm</groupId>
>>>>>>             <version>0.9.2-incubating</version>
>>>>>>             <scope>*compile*</scope>
>>>>>>         </dependency>
>>>>>> 
>>>>>> I can mvn package it, but when I run it 
>>>>>> root@DO-mq-dev:/home/stuser/kafkaprj/kafka-storm-bitmap# storm jar 
>>>>>> target/kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar 
>>>>>> storm.artemis.KafkaConsumerTopology KafkaConsumerTopology
>>>>>>  
>>>>>> 
>>>>>> I am getting such error
>>>>>> 
>>>>>> 1657 [main] INFO  
>>>>>> com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
>>>>>> 1682 [main] INFO  backtype.storm.daemon.supervisor - Starting supervisor 
>>>>>> with id a66e0c61-a951-4c1b-a43f-3fb0d12cb226 at host DO-mq-dev
>>>>>> 1698 [main] ERROR org.apache.zookeeper.server.NIOServerCnxn - Thread 
>>>>>> Thread[main,5,main] died
>>>>>> java.lang.NoClassDefFoundError: kafka/api/OffsetRequest
>>>>>>  at storm.artemis.kafka.KafkaConfig.<init>(KafkaConfig.java:26) 
>>>>>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>>>>>  at 
>>>>>> storm.artemis.kafka.trident.TridentKafkaConfig.<init>(TridentKafkaConfig.java:13)
>>>>>>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>>>>>  at 
>>>>>> storm.artemis.KafkaConsumerTopology.buildTopology(KafkaConsumerTopology.java:115)
>>>>>>  ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>>>>>  at 
>>>>>> storm.artemis.KafkaConsumerTopology.main(KafkaConsumerTopology.java:144) 
>>>>>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>>>>> Caused by: java.lang.ClassNotFoundException: kafka.api.OffsetRequest
>>>>>>  at java.net.URLClassLoader$1.run(URLClassLoader.java:366) ~[na:1.7.0_55]
>>>>>>  at java.net.URLClassLoader$1.run(URLClassLoader.java:355) ~[na:1.7.0_55]
>>>>>>  at java.security.AccessController.doPrivileged(Native Method) 
>>>>>> ~[na:1.7.0_55]
>>>>>>  at java.net.URLClassLoader.findClass(URLClassLoader.java:354) 
>>>>>> ~[na:1.7.0_55]
>>>>>>  at java.lang.ClassLoader.loadClass(ClassLoader.java:425) ~[na:1.7.0_55]
>>>>>>  at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) 
>>>>>> ~[na:1.7.0_55]
>>>>>>  at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ~[na:1.7.0_55]
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> I try to poke around online, could not find a solution for it, any idea 
>>>>>> about that?
>>>>>> 
>>>>>> 
>>>>>> Thanks
>>>>>> 
>>>>>> Alec
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 
> 


-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.

Reply via email to