Can you let the topology run for 120 seconds or so? In my experience the kafka bolt/spout takes a lot of latency initially as it tries to read/write from zookeeper and initialize connections. On my mac it takes about 15 seconds before the spout is actually opened.
Thanks Parth On Aug 5, 2014, at 1:11 PM, Sa Li <sa.in.v...@gmail.com> wrote: > If I set the sleep time as 1000 milisec, I got such error: > > 3067 [main] INFO backtype.storm.testing - Deleting temporary path > /tmp/0f1851f1-9499-48a5-817e-41712921d054 > 3163 [Thread-10-EventThread] INFO > com.netflix.curator.framework.state.ConnectionStateManager - State change: > SUSPENDED > 3163 [ConnectionStateManager-0] WARN > com.netflix.curator.framework.state.ConnectionStateManager - There are no > ConnectionStateListeners registered. > 3164 [Thread-10-EventThread] WARN backtype.storm.cluster - Received event > :disconnected::none: with disconnected Zookeeper. > 3636 [Thread-10-SendThread(localhost:2000)] WARN > org.apache.zookeeper.ClientCnxn - Session 0x147a7c868ef000b for server null, > unexpected error, closing socket connection and attempting reconnect > java.net.ConnectException: Connection refused > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) > ~[na:1.7.0_55] > at > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) > ~[na:1.7.0_55] > at > org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1119) > ~[zookeeper-3.3.3.jar:3.3.3-1073969] > 4877 [Thread-10-SendThread(localhost:2000)] WARN > org.apache.zookeeper.ClientCnxn - Session 0x147a7c868ef000b for server null, > unexpected error, closing socket connection and attempting reconnect > java.net.ConnectException: Connection refused > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) > ~[na:1.7.0_55] > at > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) > ~[na:1.7.0_55] > at > org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1119) > ~[zookeeper-3.3.3.jar:3.3.3-1073969] > 5566 [Thread-10-SendThread(localhost:2000)] WARN > org.apache.zookeeper.ClientCnxn - Session 0x147a7c868ef000b for server null, > unexpected error, closing socket connection and attempting reconnect > java.net.ConnectException: Connection refused > > seems not even connected to zookeeper, any method to confirm to connection of > zookeeper? > > Thanks a lot > > Alec > > On Aug 5, 2014, at 12:58 PM, Sa Li <sa.in.v...@gmail.com> wrote: > >> Thank you very much for your reply, Taylor. I tried to increase the sleep >> time as 1 sec or 10 sec, however I got such error, it seems to be Async loop >> error. Any idea about that? >> >> 3053 [Thread-19-$spoutcoord-spout0] INFO >> org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting >> 3058 [Thread-19-$spoutcoord-spout0] ERROR backtype.storm.util - Async loop >> died! >> java.lang.NoSuchMethodError: >> org.apache.zookeeper.ZooKeeper.<init>(Ljava/lang/String;ILorg/apache/zookeeper/Watcher;Z)V >> at >> org.apache.curator.utils.DefaultZookeeperFactory.newZooKeeper(DefaultZookeeperFactory.java:29) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> org.apache.curator.framework.imps.CuratorFrameworkImpl$2.newZooKeeper(CuratorFrameworkImpl.java:154) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> org.apache.curator.HandleHolder$1.getZooKeeper(HandleHolder.java:94) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> org.apache.curator.HandleHolder.getZooKeeper(HandleHolder.java:55) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> org.apache.curator.ConnectionState.reset(ConnectionState.java:219) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> org.apache.curator.ConnectionState.start(ConnectionState.java:103) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> org.apache.curator.CuratorZookeeperClient.start(CuratorZookeeperClient.java:190) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> org.apache.curator.framework.imps.CuratorFrameworkImpl.start(CuratorFrameworkImpl.java:256) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> storm.artemis.kafka.DynamicBrokersReader.<init>(DynamicBrokersReader.java:36) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> storm.artemis.kafka.trident.ZkBrokerReader.<init>(ZkBrokerReader.java:24) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> storm.artemis.kafka.KafkaUtils.makeBrokerReader(KafkaUtils.java:40) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> storm.artemis.kafka.trident.Coordinator.<init>(Coordinator.java:16) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> storm.artemis.kafka.trident.OpaqueTridentKafkaSpout.getCoordinator(OpaqueTridentKafkaSpout.java:29) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Coordinator.<init>(OpaquePartitionedTridentSpoutExecutor.java:27) >> ~[storm-core-0.9.0.1.jar:na] >> at >> storm.trident.spout.OpaquePartitionedTridentSpoutExecutor.getCoordinator(OpaquePartitionedTridentSpoutExecutor.java:166) >> ~[storm-core-0.9.0.1.jar:na] >> at >> storm.trident.spout.TridentSpoutCoordinator.prepare(TridentSpoutCoordinator.java:38) >> ~[storm-core-0.9.0.1.jar:na] >> at >> backtype.storm.topology.BasicBoltExecutor.prepare(BasicBoltExecutor.java:26) >> ~[storm-core-0.9.0.1.jar:na] >> at >> backtype.storm.daemon.executor$fn__3498$fn__3510.invoke(executor.clj:674) >> ~[storm-core-0.9.0.1.jar:na] >> at backtype.storm.util$async_loop$fn__444.invoke(util.clj:401) >> ~[storm-core-0.9.0.1.jar:na] >> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na] >> at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55] >> 3058 [Thread-25-spout0] ERROR backtype.storm.util - Async loop died! >> java.lang.NoSuchMethodError: >> org.apache.zookeeper.ZooKeeper.<init>(Ljava/lang/String;ILorg/apache/zookeeper/Watcher;Z)V >> at >> org.apache.curator.utils.DefaultZookeeperFactory.newZooKeeper(DefaultZookeeperFactory.java:29) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> org.apache.curator.framework.imps.CuratorFrameworkImpl$2.newZooKeeper(CuratorFrameworkImpl.java:154) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> org.apache.curator.HandleHolder$1.getZooKeeper(HandleHolder.java:94) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> org.apache.curator.HandleHolder.getZooKeeper(HandleHolder.java:55) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> org.apache.curator.ConnectionState.reset(ConnectionState.java:219) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> org.apache.curator.ConnectionState.start(ConnectionState.java:103) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> org.apache.curator.CuratorZookeeperClient.start(CuratorZookeeperClient.java:190) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> org.apache.curator.framework.imps.CuratorFrameworkImpl.start(CuratorFrameworkImpl.java:256) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> storm.artemis.kafka.DynamicBrokersReader.<init>(DynamicBrokersReader.java:36) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> storm.artemis.kafka.trident.ZkBrokerReader.<init>(ZkBrokerReader.java:24) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> storm.artemis.kafka.KafkaUtils.makeBrokerReader(KafkaUtils.java:40) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> storm.artemis.kafka.trident.TridentKafkaEmitter.<init>(TridentKafkaEmitter.java:44) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> storm.artemis.kafka.trident.OpaqueTridentKafkaSpout.getEmitter(OpaqueTridentKafkaSpout.java:24) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.<init>(OpaquePartitionedTridentSpoutExecutor.java:69) >> ~[storm-core-0.9.0.1.jar:na] >> at >> storm.trident.spout.OpaquePartitionedTridentSpoutExecutor.getEmitter(OpaquePartitionedTridentSpoutExecutor.java:171) >> ~[storm-core-0.9.0.1.jar:na] >> at >> storm.trident.spout.OpaquePartitionedTridentSpoutExecutor.getEmitter(OpaquePartitionedTridentSpoutExecutor.java:20) >> ~[storm-core-0.9.0.1.jar:na] >> at >> storm.trident.spout.TridentSpoutExecutor.prepare(TridentSpoutExecutor.java:43) >> ~[storm-core-0.9.0.1.jar:na] >> at >> storm.trident.topology.TridentBoltExecutor.prepare(TridentBoltExecutor.java:214) >> ~[storm-core-0.9.0.1.jar:na] >> at >> backtype.storm.daemon.executor$fn__3498$fn__3510.invoke(executor.clj:674) >> ~[storm-core-0.9.0.1.jar:na] >> at backtype.storm.util$async_loop$fn__444.invoke(util.clj:401) >> ~[storm-core-0.9.0.1.jar:na] >> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na] >> at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55] >> 3059 [Thread-19-$spoutcoord-spout0] ERROR backtype.storm.daemon.executor - >> java.lang.NoSuchMethodError: >> org.apache.zookeeper.ZooKeeper.<init>(Ljava/lang/String;ILorg/apache/zookeeper/Watcher;Z)V >> at >> org.apache.curator.utils.DefaultZookeeperFactory.newZooKeeper(DefaultZookeeperFactory.java:29) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> org.apache.curator.framework.imps.CuratorFrameworkImpl$2.newZooKeeper(CuratorFrameworkImpl.java:154) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> org.apache.curator.HandleHolder$1.getZooKeeper(HandleHolder.java:94) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> org.apache.curator.HandleHolder.getZooKeeper(HandleHolder.java:55) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> org.apache.curator.ConnectionState.reset(ConnectionState.java:219) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> org.apache.curator.ConnectionState.start(ConnectionState.java:103) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> org.apache.curator.CuratorZookeeperClient.start(CuratorZookeeperClient.java:190) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> org.apache.curator.framework.imps.CuratorFrameworkImpl.start(CuratorFrameworkImpl.java:256) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> storm.artemis.kafka.DynamicBrokersReader.<init>(DynamicBrokersReader.java:36) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> storm.artemis.kafka.trident.ZkBrokerReader.<init>(ZkBrokerReader.java:24) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> storm.artemis.kafka.KafkaUtils.makeBrokerReader(KafkaUtils.java:40) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> storm.artemis.kafka.trident.Coordinator.<init>(Coordinator.java:16) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> storm.artemis.kafka.trident.OpaqueTridentKafkaSpout.getCoordinator(OpaqueTridentKafkaSpout.java:29) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Coordinator.<init>(OpaquePartitionedTridentSpoutExecutor.java:27) >> ~[storm-core-0.9.0.1.jar:na] >> at >> storm.trident.spout.OpaquePartitionedTridentSpoutExecutor.getCoordinator(OpaquePartitionedTridentSpoutExecutor.java:166) >> ~[storm-core-0.9.0.1.jar:na] >> at >> storm.trident.spout.TridentSpoutCoordinator.prepare(TridentSpoutCoordinator.java:38) >> ~[storm-core-0.9.0.1.jar:na] >> at >> backtype.storm.topology.BasicBoltExecutor.prepare(BasicBoltExecutor.java:26) >> ~[storm-core-0.9.0.1.jar:na] >> at >> backtype.storm.daemon.executor$fn__3498$fn__3510.invoke(executor.clj:674) >> ~[storm-core-0.9.0.1.jar:na] >> at backtype.storm.util$async_loop$fn__444.invoke(util.clj:401) >> ~[storm-core-0.9.0.1.jar:na] >> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na] >> at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55] >> 3059 [Thread-25-spout0] ERROR backtype.storm.daemon.executor - >> java.lang.NoSuchMethodError: >> org.apache.zookeeper.ZooKeeper.<init>(Ljava/lang/String;ILorg/apache/zookeeper/Watcher;Z)V >> at >> org.apache.curator.utils.DefaultZookeeperFactory.newZooKeeper(DefaultZookeeperFactory.java:29) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> org.apache.curator.framework.imps.CuratorFrameworkImpl$2.newZooKeeper(CuratorFrameworkImpl.java:154) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> org.apache.curator.HandleHolder$1.getZooKeeper(HandleHolder.java:94) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> org.apache.curator.HandleHolder.getZooKeeper(HandleHolder.java:55) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> org.apache.curator.ConnectionState.reset(ConnectionState.java:219) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> org.apache.curator.ConnectionState.start(ConnectionState.java:103) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> org.apache.curator.CuratorZookeeperClient.start(CuratorZookeeperClient.java:190) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> org.apache.curator.framework.imps.CuratorFrameworkImpl.start(CuratorFrameworkImpl.java:256) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> storm.artemis.kafka.DynamicBrokersReader.<init>(DynamicBrokersReader.java:36) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> storm.artemis.kafka.trident.ZkBrokerReader.<init>(ZkBrokerReader.java:24) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> storm.artemis.kafka.KafkaUtils.makeBrokerReader(KafkaUtils.java:40) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> storm.artemis.kafka.trident.TridentKafkaEmitter.<init>(TridentKafkaEmitter.java:44) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> storm.artemis.kafka.trident.OpaqueTridentKafkaSpout.getEmitter(OpaqueTridentKafkaSpout.java:24) >> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >> at >> storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.<init>(OpaquePartitionedTridentSpoutExecutor.java:69) >> ~[storm-core-0.9.0.1.jar:na] >> at >> storm.trident.spout.OpaquePartitionedTridentSpoutExecutor.getEmitter(OpaquePartitionedTridentSpoutExecutor.java:171) >> ~[storm-core-0.9.0.1.jar:na] >> at >> storm.trident.spout.OpaquePartitionedTridentSpoutExecutor.getEmitter(OpaquePartitionedTridentSpoutExecutor.java:20) >> ~[storm-core-0.9.0.1.jar:na] >> at >> storm.trident.spout.TridentSpoutExecutor.prepare(TridentSpoutExecutor.java:43) >> ~[storm-core-0.9.0.1.jar:na] >> at >> storm.trident.topology.TridentBoltExecutor.prepare(TridentBoltExecutor.java:214) >> ~[storm-core-0.9.0.1.jar:na] >> at >> backtype.storm.daemon.executor$fn__3498$fn__3510.invoke(executor.clj:674) >> ~[storm-core-0.9.0.1.jar:na] >> at backtype.storm.util$async_loop$fn__444.invoke(util.clj:401) >> ~[storm-core-0.9.0.1.jar:na] >> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na] >> at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55] >> 3059 [Thread-7] INFO backtype.storm.daemon.worker - Worker has topology >> config {"storm.id" "kafka-1-1407268492", "dev.zookeeper.path" >> "/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil, >> "topology.builtin.metrics.bucket.size.secs" 60, >> "topology.fall.back.on.java.serialization" true, >> "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, >> "topology.skip.missing.kryo.registrations" true, >> "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", >> "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, >> "topology.trident.batch.emit.interval.millis" 50, "nimbus.monitor.freq.secs" >> 10, "logviewer.childopts" "-Xmx128m", "java.library.path" >> "/usr/lib/jvm/java-7-openjdk-amd64", "topology.executor.send.buffer.size" >> 1024, "storm.local.dir" "/tmp/ca948198-69df-440b-8acb-6dfc4db6c288", >> "storm.messaging.netty.buffer_size" 5242880, >> "supervisor.worker.start.timeout.secs" 120, >> "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" >> 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, >> "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "10.100.70.128", >> "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2000, >> "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" >> 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" >> "/storm", "storm.zookeeper.retry.intervalceiling.millis" 30000, >> "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, >> "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" >> "/transactional", "topology.acker.executors" nil, "topology.kryo.decorators" >> (), "topology.name" "kafka", "topology.transfer.buffer.size" 1024, >> "topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts" >> "-Xmx768m", "supervisor.heartbeat.frequency.secs" 5, >> "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, >> "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", >> "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, >> "topology.tasks" nil, "storm.messaging.netty.max_retries" 30, >> "topology.spout.wait.strategy" >> "backtype.storm.spout.SleepSpoutWaitStrategy", "topology.max.spout.pending" >> nil, "storm.zookeeper.retry.interval" 1000, >> "topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" >> "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" >> (1 2 3), "topology.debug" true, "nimbus.task.launch.secs" 120, >> "nimbus.supervisor.timeout.secs" 60, "topology.kryo.register" >> {"storm.trident.topology.TransactionAttempt" nil}, >> "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, >> "topology.workers" 1, "supervisor.childopts" "-Xmx256m", >> "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, >> "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" >> "backtype.storm.serialization.types.ListDelegateSerializer", >> "topology.disruptor.wait.strategy" >> "com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs" 30, >> "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" >> "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" >> 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times" >> 5, "storm.thrift.transport" >> "backtype.storm.security.auth.SimpleTransportPlugin", >> "topology.state.synchronization.timeout.secs" 60, >> "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, >> "storm.messaging.transport" "backtype.storm.messaging.zmq", >> "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, >> "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" >> 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", >> "topology.optimize" true, "topology.max.task.parallelism" nil} >> 3059 [Thread-7] INFO backtype.storm.daemon.worker - Worker >> 64335058-7f94-447f-bc0a-5107084789a0 for storm kafka-1-1407268492 on >> cf2964b3-7655-4a33-88a1-f6e0ceb6f9ed:1 has finished loading >> 3164 [Thread-29-$mastercoord-bg0] INFO >> com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting >> 3173 [Thread-25-spout0] INFO backtype.storm.util - Halting process: >> ("Worker died") >> 3173 [Thread-19-$spoutcoord-spout0] INFO backtype.storm.util - Halting >> process: ("Worker died") >> >> Thanks >> >> Alec >> >> On Aug 5, 2014, at 10:26 AM, P. Taylor Goetz <ptgo...@gmail.com> wrote: >> >>> You are only sleeping for 100 milliseconds before shutting down the local >>> cluster, which is probably not long enough for the topology to come up and >>> start processing messages. Try increasing the sleep time to something like >>> 10 seconds. >>> >>> You can also reduce startup time with the following JVM flag: >>> >>> -Djava.net.preferIPv4Stack=true >>> >>> - Taylor >>> >>> On Aug 5, 2014, at 1:16 PM, Sa Li <sa.in.v...@gmail.com> wrote: >>> >>>> Sorry, the stormTopology: >>>> >>>>> TridentTopology topology = new TridentTopology(); >>>>> BrokerHosts zk = new ZkHosts("localhost"); >>>>> TridentKafkaConfig spoutConf = new >>>>> TridentKafkaConfig(zk, “topictest"); >>>>> spoutConf.scheme = new SchemeAsMultiScheme(new >>>>> StringScheme()); >>>>> OpaqueTridentKafkaSpout spout = new >>>>> OpaqueTridentKafkaSpout(spoutConf); >>>> >>>> >>>> >>>> >>>> On Aug 5, 2014, at 9:56 AM, Sa Li <sa.in.v...@gmail.com> wrote: >>>> >>>>> Thank you very much, Marcelo, it indeed worked, now I can run my code >>>>> without getting error. However, another thing is keeping bother me, >>>>> following is my code: >>>>> >>>>> public static class PrintStream implements Filter { >>>>> >>>>> @SuppressWarnings("rawtypes”) >>>>> @Override >>>>> public void prepare(Map conf, TridentOperationContext >>>>> context) { >>>>> } >>>>> @Override >>>>> public void cleanup() { >>>>> } >>>>> @Override >>>>> public boolean isKeep(TridentTuple tuple) { >>>>> System.out.println(tuple); >>>>> return true; >>>>> } >>>>> } >>>>> public static StormTopology buildTopology(LocalDRPC drpc) throws >>>>> IOException { >>>>> >>>>> TridentTopology topology = new TridentTopology(); >>>>> BrokerHosts zk = new ZkHosts("localhost"); >>>>> TridentKafkaConfig spoutConf = new >>>>> TridentKafkaConfig(zk, "ingest_test"); >>>>> spoutConf.scheme = new SchemeAsMultiScheme(new >>>>> StringScheme()); >>>>> OpaqueTridentKafkaSpout spout = new >>>>> OpaqueTridentKafkaSpout(spoutConf); >>>>> >>>>> topology.newStream("kafka", spout) >>>>> .each(new Fields("str"), >>>>> new PrintStream() >>>>> >>>>> ); >>>>> >>>>> return topology.build(); >>>>> } >>>>> public static void main(String[] args) throws Exception { >>>>> >>>>> Config conf = new Config(); >>>>> conf.setDebug(true); >>>>> conf.setMaxSpoutPending(1); >>>>> conf.setMaxTaskParallelism(3); >>>>> LocalDRPC drpc = new LocalDRPC(); >>>>> LocalCluster cluster = new LocalCluster(); >>>>> cluster.submitTopology("kafka", conf, buildTopology(drpc)); >>>>> >>>>> Thread.sleep(100); >>>>> cluster.shutdown(); >>>>> } >>>>> >>>>> What I expect is quite simple, print out the message I collect from a >>>>> kafka producer playback process which is running separately. The topic is >>>>> listed as: >>>>> >>>>> root@DO-mq-dev:/etc/kafka# bin/kafka-list-topic.sh --zookeeper >>>>> localhost:2181 >>>>> topic: topictest partition: 0 leader: 1 replicas: 1,3,2 >>>>> isr: 1,3,2 >>>>> topic: topictest partition: 1 leader: 2 replicas: 2,1,3 >>>>> isr: 2,1,3 >>>>> topic: topictest partition: 2 leader: 3 replicas: 3,2,1 >>>>> isr: 3,2,1 >>>>> topic: topictest partition: 3 leader: 1 replicas: 1,2,3 >>>>> isr: 1,2,3 >>>>> topic: topictest partition: 4 leader: 2 replicas: 2,3,1 >>>>> isr: 2,3,1 >>>>> >>>>> When I am running the code, this is what I saw on the screen, seems no >>>>> error, but no message print out as well: >>>>> >>>>> SLF4J: Class path contains multiple SLF4J bindings. >>>>> SLF4J: Found binding in >>>>> [jar:file:/etc/storm-0.9.0.1/lib/logback-classic-1.0.6.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>>>> SLF4J: Found binding in >>>>> [jar:file:/home/stuser/kafkaprj/kafka-storm-bitmap/target/kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an >>>>> explanation. >>>>> Running: java -client -Dstorm.options= -Dstorm.home=/etc/storm-0.9.0.1 >>>>> -Djava.library.path=/usr/lib/jvm/java-7-openjdk-amd64 -Dstorm.conf.file= >>>>> -cp >>>>> /etc/storm-0.9.0.1/storm-netty-0.9.0.1.jar:/etc/storm-0.9.0.1/storm-core-0.9.0.1.jar:/etc/storm-0.9.0.1/storm-console-logging-0.9.0.1.jar:/etc/storm-0.9.0.1/lib/log4j-over-slf4j-1.6.6.jar:/etc/storm-0.9.0.1/lib/commons-io-1.4.jar:/etc/storm-0.9.0.1/lib/joda-time-2.0.jar:/etc/storm-0.9.0.1/lib/tools.nrepl-0.2.3.jar:/etc/storm-0.9.0.1/lib/servlet-api-2.5.jar:/etc/storm-0.9.0.1/lib/curator-framework-1.0.1.jar:/etc/storm-0.9.0.1/lib/core.incubator-0.1.0.jar:/etc/storm-0.9.0.1/lib/jetty-6.1.26.jar:/etc/storm-0.9.0.1/lib/commons-codec-1.4.jar:/etc/storm-0.9.0.1/lib/servlet-api-2.5-20081211.jar:/etc/storm-0.9.0.1/lib/httpclient-4.1.1.jar:/etc/storm-0.9.0.1/lib/commons-exec-1.1.jar:/etc/storm-0.9.0.1/lib/logback-classic-1.0.6.jar:/etc/storm-0.9.0.1/lib/libthrift7-0.7.0-2.jar:/etc/storm-0.9.0.1/lib/minlog-1.2.jar:/etc/storm-0.9.0.1/lib/clojure-complete-0.2.3.jar:/etc/storm-0.9.0.1/lib/clojure-1.4.0.jar:/etc/storm-0.9.0.1/lib/asm-4.0.jar:/etc/storm-0.9.0.1/lib/mockito-all-1.9.5.jar:/etc/storm-0.9.0.1/lib/commons-fileupload-1.2.1.jar:/etc/storm-0.9.0.1/lib/clout-1.0.1.jar:/etc/storm-0.9.0.1/lib/ring-servlet-0.3.11.jar:/etc/storm-0.9.0.1/lib/ring-devel-0.3.11.jar:/etc/storm-0.9.0.1/lib/jgrapht-0.8.3.jar:/etc/storm-0.9.0.1/lib/snakeyaml-1.11.jar:/etc/storm-0.9.0.1/lib/reflectasm-1.07-shaded.jar:/etc/storm-0.9.0.1/lib/kryo-2.17.jar:/etc/storm-0.9.0.1/lib/ring-jetty-adapter-0.3.11.jar:/etc/storm-0.9.0.1/lib/compojure-1.1.3.jar:/etc/storm-0.9.0.1/lib/objenesis-1.2.jar:/etc/storm-0.9.0.1/lib/commons-logging-1.1.1.jar:/etc/storm-0.9.0.1/lib/tools.macro-0.1.0.jar:/etc/storm-0.9.0.1/lib/junit-3.8.1.jar:/etc/storm-0.9.0.1/lib/json-simple-1.1.jar:/etc/storm-0.9.0.1/lib/tools.cli-0.2.2.jar:/etc/storm-0.9.0.1/lib/curator-client-1.0.1.jar:/etc/storm-0.9.0.1/lib/jline-0.9.94.jar:/etc/storm-0.9.0.1/lib/zookeeper-3.3.3.jar:/etc/storm-0.9.0.1/lib/guava-13.0.jar:/etc/storm-0.9.0.1/lib/commons-lang-2.5.jar:/etc/storm-0.9.0.1/lib/carbonite-1.5.0.jar:/etc/storm-0.9.0.1/lib/ring-core-1.1.5.jar:/etc/storm-0.9.0.1/lib/jzmq-2.1.0.jar:/etc/storm-0.9.0.1/lib/hiccup-0.3.6.jar:/etc/storm-0.9.0.1/lib/tools.logging-0.2.3.jar:/etc/storm-0.9.0.1/lib/kafka_2.9.2-0.8.0.jar:/etc/storm-0.9.0.1/lib/clj-stacktrace-0.2.2.jar:/etc/storm-0.9.0.1/lib/math.numeric-tower-0.0.1.jar:/etc/storm-0.9.0.1/lib/slf4j-api-1.6.5.jar:/etc/storm-0.9.0.1/lib/netty-3.6.3.Final.jar:/etc/storm-0.9.0.1/lib/disruptor-2.10.1.jar:/etc/storm-0.9.0.1/lib/jetty-util-6.1.26.jar:/etc/storm-0.9.0.1/lib/httpcore-4.1.jar:/etc/storm-0.9.0.1/lib/logback-core-1.0.6.jar:/etc/storm-0.9.0.1/lib/clj-time-0.4.1.jar:target/kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:/etc/storm-0.9.0.1/conf:/etc/storm-0.9.0.1/bin >>>>> >>>>> -Dstorm.jar=target/kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar >>>>> storm.artemis.KafkaConsumerTopology KafkaConsumerTopology >>>>> SLF4J: Class path contains multiple SLF4J bindings. >>>>> SLF4J: Found binding in >>>>> [jar:file:/etc/storm-0.9.0.1/lib/logback-classic-1.0.6.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>>>> SLF4J: Found binding in >>>>> [jar:file:/home/stuser/kafkaprj/kafka-storm-bitmap/target/kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an >>>>> explanation. >>>>> 1113 [main] INFO backtype.storm.zookeeper - Starting inprocess zookeeper >>>>> at port 2000 and dir /tmp/dd37d0cc-79b3-4f23-b6a5-3bcf5a9f0879 >>>>> 1216 [main] INFO backtype.storm.daemon.nimbus - Starting Nimbus with >>>>> conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", >>>>> "topology.tick.tuple.freq.secs" nil, >>>>> "topology.builtin.metrics.bucket.size.secs" 60, >>>>> "topology.fall.back.on.java.serialization" true, >>>>> "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, >>>>> "topology.skip.missing.kryo.registrations" true, >>>>> "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" >>>>> "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" >>>>> true, "topology.trident.batch.emit.interval.millis" 50, >>>>> "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", >>>>> "java.library.path" "/usr/lib/jvm/java-7-openjdk-amd64", >>>>> "topology.executor.send.buffer.size" 1024, "storm.local.dir" >>>>> "/tmp/cf44f174-2cda-4e67-8c85-e9f96897fcd9", >>>>> "storm.messaging.netty.buffer_size" 5242880, >>>>> "supervisor.worker.start.timeout.secs" 120, >>>>> "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" >>>>> 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, >>>>> "topology.worker.shared.thread.pool.size" 4, "nimbus.host" >>>>> "10.100.70.128", "storm.messaging.netty.min_wait_ms" 100, >>>>> "storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil, >>>>> "topology.executor.receive.buffer.size" 1024, >>>>> "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", >>>>> "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" >>>>> true, "storm.messaging.netty.server_worker_threads" 1, >>>>> "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" >>>>> "/transactional", "topology.acker.executors" nil, >>>>> "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, >>>>> "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", >>>>> "supervisor.heartbeat.frequency.secs" 5, >>>>> "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" >>>>> 3772, "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", >>>>> "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, >>>>> "topology.tasks" nil, "storm.messaging.netty.max_retries" 30, >>>>> "topology.spout.wait.strategy" >>>>> "backtype.storm.spout.SleepSpoutWaitStrategy", >>>>> "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, >>>>> "topology.sleep.spout.wait.strategy.time.ms" 1, >>>>> "nimbus.topology.validator" >>>>> "backtype.storm.nimbus.DefaultTopologyValidator", >>>>> "supervisor.slots.ports" [6700 6701 6702 6703], "topology.debug" false, >>>>> "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, >>>>> "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, >>>>> "topology.workers" 1, "supervisor.childopts" "-Xmx256m", >>>>> "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, >>>>> "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" >>>>> "backtype.storm.serialization.types.ListDelegateSerializer", >>>>> "topology.disruptor.wait.strategy" >>>>> "com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs" 30, >>>>> "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" >>>>> "backtype.storm.serialization.DefaultKryoFactory", >>>>> "drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, >>>>> "storm.zookeeper.retry.times" 5, "storm.thrift.transport" >>>>> "backtype.storm.security.auth.SimpleTransportPlugin", >>>>> "topology.state.synchronization.timeout.secs" 60, >>>>> "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" >>>>> 600, "storm.messaging.transport" "backtype.storm.messaging.zmq", >>>>> "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, >>>>> "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" >>>>> 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", >>>>> "topology.optimize" true, "topology.max.task.parallelism" nil} >>>>> 1219 [main] INFO backtype.storm.daemon.nimbus - Using default scheduler >>>>> 1237 [main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl >>>>> - Starting >>>>> 1303 [main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state >>>>> update: :connected:none >>>>> 1350 [main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl >>>>> - Starting >>>>> 1417 [main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl >>>>> - Starting >>>>> 1432 [main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state >>>>> update: :connected:none >>>>> 1482 [main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl >>>>> - Starting >>>>> 1484 [main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl >>>>> - Starting >>>>> 1532 [main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state >>>>> update: :connected:none >>>>> 1540 [main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl >>>>> - Starting >>>>> 1568 [main] INFO backtype.storm.daemon.supervisor - Starting Supervisor >>>>> with conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", >>>>> "topology.tick.tuple.freq.secs" nil, >>>>> "topology.builtin.metrics.bucket.size.secs" 60, >>>>> "topology.fall.back.on.java.serialization" true, >>>>> "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, >>>>> "topology.skip.missing.kryo.registrations" true, >>>>> "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" >>>>> "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" >>>>> true, "topology.trident.batch.emit.interval.millis" 50, >>>>> "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", >>>>> "java.library.path" "/usr/lib/jvm/java-7-openjdk-amd64", >>>>> "topology.executor.send.buffer.size" 1024, "storm.local.dir" >>>>> "/tmp/3e515769-ebf5-4085-a6bf-35f4ad8be388", >>>>> "storm.messaging.netty.buffer_size" 5242880, >>>>> "supervisor.worker.start.timeout.secs" 120, >>>>> "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" >>>>> 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, >>>>> "topology.worker.shared.thread.pool.size" 4, "nimbus.host" >>>>> "10.100.70.128", "storm.messaging.netty.min_wait_ms" 100, >>>>> "storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil, >>>>> "topology.executor.receive.buffer.size" 1024, >>>>> "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", >>>>> "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" >>>>> true, "storm.messaging.netty.server_worker_threads" 1, >>>>> "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" >>>>> "/transactional", "topology.acker.executors" nil, >>>>> "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, >>>>> "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", >>>>> "supervisor.heartbeat.frequency.secs" 5, >>>>> "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" >>>>> 3772, "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", >>>>> "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, >>>>> "topology.tasks" nil, "storm.messaging.netty.max_retries" 30, >>>>> "topology.spout.wait.strategy" >>>>> "backtype.storm.spout.SleepSpoutWaitStrategy", >>>>> "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, >>>>> "topology.sleep.spout.wait.strategy.time.ms" 1, >>>>> "nimbus.topology.validator" >>>>> "backtype.storm.nimbus.DefaultTopologyValidator", >>>>> "supervisor.slots.ports" (1 2 3), "topology.debug" false, >>>>> "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, >>>>> "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, >>>>> "topology.workers" 1, "supervisor.childopts" "-Xmx256m", >>>>> "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, >>>>> "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" >>>>> "backtype.storm.serialization.types.ListDelegateSerializer", >>>>> "topology.disruptor.wait.strategy" >>>>> "com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs" 30, >>>>> "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" >>>>> "backtype.storm.serialization.DefaultKryoFactory", >>>>> "drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, >>>>> "storm.zookeeper.retry.times" 5, "storm.thrift.transport" >>>>> "backtype.storm.security.auth.SimpleTransportPlugin", >>>>> "topology.state.synchronization.timeout.secs" 60, >>>>> "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" >>>>> 600, "storm.messaging.transport" "backtype.storm.messaging.zmq", >>>>> "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, >>>>> "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" >>>>> 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", >>>>> "topology.optimize" true, "topology.max.task.parallelism" nil} >>>>> 1576 [main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl >>>>> - Starting >>>>> 1582 [main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state >>>>> update: :connected:none >>>>> 1590 [main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl >>>>> - Starting >>>>> 1632 [main] INFO backtype.storm.daemon.supervisor - Starting supervisor >>>>> with id 944e6152-ca58-4d2b-8325-94ac98f43995 at host DO-mq-dev >>>>> 1636 [main] INFO backtype.storm.daemon.supervisor - Starting Supervisor >>>>> with conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", >>>>> "topology.tick.tuple.freq.secs" nil, >>>>> "topology.builtin.metrics.bucket.size.secs" 60, >>>>> "topology.fall.back.on.java.serialization" true, >>>>> "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, >>>>> "topology.skip.missing.kryo.registrations" true, >>>>> "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" >>>>> "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" >>>>> true, "topology.trident.batch.emit.interval.millis" 50, >>>>> "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", >>>>> "java.library.path" "/usr/lib/jvm/java-7-openjdk-amd64", >>>>> "topology.executor.send.buffer.size" 1024, "storm.local.dir" >>>>> "/tmp/d0aeb5f4-0830-4efd-be7f-bc40d5b66912", >>>>> "storm.messaging.netty.buffer_size" 5242880, >>>>> "supervisor.worker.start.timeout.secs" 120, >>>>> "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" >>>>> 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, >>>>> "topology.worker.shared.thread.pool.size" 4, "nimbus.host" >>>>> "10.100.70.128", "storm.messaging.netty.min_wait_ms" 100, >>>>> "storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil, >>>>> "topology.executor.receive.buffer.size" 1024, >>>>> "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", >>>>> "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" >>>>> true, "storm.messaging.netty.server_worker_threads" 1, >>>>> "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" >>>>> "/transactional", "topology.acker.executors" nil, >>>>> "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, >>>>> "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", >>>>> "supervisor.heartbeat.frequency.secs" 5, >>>>> "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" >>>>> 3772, "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", >>>>> "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, >>>>> "topology.tasks" nil, "storm.messaging.netty.max_retries" 30, >>>>> "topology.spout.wait.strategy" >>>>> "backtype.storm.spout.SleepSpoutWaitStrategy", >>>>> "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, >>>>> "topology.sleep.spout.wait.strategy.time.ms" 1, >>>>> "nimbus.topology.validator" >>>>> "backtype.storm.nimbus.DefaultTopologyValidator", >>>>> "supervisor.slots.ports" (4 5 6), "topology.debug" false, >>>>> "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, >>>>> "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, >>>>> "topology.workers" 1, "supervisor.childopts" "-Xmx256m", >>>>> "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, >>>>> "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" >>>>> "backtype.storm.serialization.types.ListDelegateSerializer", >>>>> "topology.disruptor.wait.strategy" >>>>> "com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs" 30, >>>>> "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" >>>>> "backtype.storm.serialization.DefaultKryoFactory", >>>>> "drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, >>>>> "storm.zookeeper.retry.times" 5, "storm.thrift.transport" >>>>> "backtype.storm.security.auth.SimpleTransportPlugin", >>>>> "topology.state.synchronization.timeout.secs" 60, >>>>> "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" >>>>> 600, "storm.messaging.transport" "backtype.storm.messaging.zmq", >>>>> "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, >>>>> "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" >>>>> 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", >>>>> "topology.optimize" true, "topology.max.task.parallelism" nil} >>>>> 1638 [main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl >>>>> - Starting >>>>> 1648 [main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state >>>>> update: :connected:none >>>>> 1690 [main] INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl >>>>> - Starting >>>>> 1740 [main] INFO backtype.storm.daemon.supervisor - Starting supervisor >>>>> with id e8303ca7-9cc4-4551-8387-7559fc3c53fc at host DO-mq-dev >>>>> 1944 [main] INFO backtype.storm.daemon.nimbus - Received topology >>>>> submission for kafka with conf {"topology.max.task.parallelism" nil, >>>>> "topology.acker.executors" nil, "topology.kryo.register" >>>>> {"storm.trident.topology.TransactionAttempt" nil}, >>>>> "topology.kryo.decorators" (), "topology.name" "kafka", "storm.id" >>>>> "kafka-1-1407257070", "topology.debug" true} >>>>> 1962 [main] INFO backtype.storm.daemon.nimbus - Activating kafka: >>>>> kafka-1-1407257070 >>>>> 2067 [main] INFO backtype.storm.scheduler.EvenScheduler - Available >>>>> slots: (["944e6152-ca58-4d2b-8325-94ac98f43995" 1] >>>>> ["944e6152-ca58-4d2b-8325-94ac98f43995" 2] >>>>> ["944e6152-ca58-4d2b-8325-94ac98f43995" 3] >>>>> ["e8303ca7-9cc4-4551-8387-7559fc3c53fc" 4] >>>>> ["e8303ca7-9cc4-4551-8387-7559fc3c53fc" 5] >>>>> ["e8303ca7-9cc4-4551-8387-7559fc3c53fc" 6]) >>>>> 2088 [main] INFO backtype.storm.daemon.nimbus - Setting new assignment >>>>> for topology id kafka-1-1407257070: >>>>> #backtype.storm.daemon.common.Assignment{:master-code-dir >>>>> "/tmp/cf44f174-2cda-4e67-8c85-e9f96897fcd9/nimbus/stormdist/kafka-1-1407257070", >>>>> :node->host {"944e6152-ca58-4d2b-8325-94ac98f43995" "DO-mq-dev"}, >>>>> :executor->node+port {[3 3] ["944e6152-ca58-4d2b-8325-94ac98f43995" 1], >>>>> [5 5] ["944e6152-ca58-4d2b-8325-94ac98f43995" 1], [4 4] >>>>> ["944e6152-ca58-4d2b-8325-94ac98f43995" 1], [2 2] >>>>> ["944e6152-ca58-4d2b-8325-94ac98f43995" 1], [1 1] >>>>> ["944e6152-ca58-4d2b-8325-94ac98f43995" 1]}, :executor->start-time-secs >>>>> {[1 1] 1407257070, [2 2] 1407257070, [4 4] 1407257070, [5 5] 1407257070, >>>>> [3 3] 1407257070}} >>>>> 2215 [main] INFO backtype.storm.daemon.nimbus - Shutting down master >>>>> 2223 [main] INFO backtype.storm.daemon.nimbus - Shut down master >>>>> 2239 [main] INFO backtype.storm.daemon.supervisor - Shutting down >>>>> supervisor 944e6152-ca58-4d2b-8325-94ac98f43995 >>>>> 2240 [Thread-6] INFO backtype.storm.event - Event manager interrupted >>>>> 2241 [Thread-7] INFO backtype.storm.event - Event manager interrupted >>>>> 2248 [main] INFO backtype.storm.daemon.supervisor - Shutting down >>>>> supervisor e8303ca7-9cc4-4551-8387-7559fc3c53fc >>>>> 2248 [Thread-9] INFO backtype.storm.event - Event manager interrupted >>>>> 2248 [Thread-10] INFO backtype.storm.event - Event manager interrupted >>>>> 2256 [main] INFO backtype.storm.testing - Shutting down in process >>>>> zookeeper >>>>> 2257 [main] INFO backtype.storm.testing - Done shutting down in process >>>>> zookeeper >>>>> 2258 [main] INFO backtype.storm.testing - Deleting temporary path >>>>> /tmp/cf44f174-2cda-4e67-8c85-e9f96897fcd9 >>>>> 2259 [main] INFO backtype.storm.testing - Deleting temporary path >>>>> /tmp/dd37d0cc-79b3-4f23-b6a5-3bcf5a9f0879 >>>>> 2260 [main] INFO backtype.storm.testing - Deleting temporary path >>>>> /tmp/3e515769-ebf5-4085-a6bf-35f4ad8be388 >>>>> 2261 [main] INFO backtype.storm.testing - Deleting temporary path >>>>> /tmp/d0aeb5f4-0830-4efd-be7f-bc40d5b66912 >>>>> >>>>> Anyone can help me locate what the problem is? I really need to walk >>>>> through this step in order to be able to replace .each(printStream()) >>>>> with other functions. >>>>> >>>>> >>>>> Thanks >>>>> >>>>> Alec >>>>> >>>>> On Aug 4, 2014, at 4:24 AM, Marcelo Valle <mva...@redoop.org> wrote: >>>>> >>>>>> hello, >>>>>> >>>>>> you can check your .jar application with command " jar tf " to see if >>>>>> class kafka/api/OffsetRequest.class is part of the jar. >>>>>> If not you can try to copy kafka-2.9.2-0.8.0.jar (or version you are >>>>>> using) in storm_lib directory >>>>>> >>>>>> Marcelo >>>>>> >>>>>> >>>>>> 2014-07-31 23:33 GMT+02:00 Sa Li <sa.in.v...@gmail.com>: >>>>>> Hi, all >>>>>> >>>>>> I am running a kafka-spout code in storm-server, the pom is >>>>>> >>>>>> <groupId>org.apache.kafka</groupId> >>>>>> <artifactId>kafka_2.9.2</artifactId> >>>>>> <version>0.8.0</version> >>>>>> <scope>provided</scope> >>>>>> >>>>>> <exclusions> >>>>>> <exclusion> >>>>>> <groupId>org.apache.zookeeper</groupId> >>>>>> <artifactId>zookeeper</artifactId> >>>>>> </exclusion> >>>>>> <exclusion> >>>>>> <groupId>log4j</groupId> >>>>>> <artifactId>log4j</artifactId> >>>>>> </exclusion> >>>>>> </exclusions> >>>>>> >>>>>> </dependency> >>>>>> >>>>>> <!-- Storm-Kafka compiled --> >>>>>> >>>>>> <dependency> >>>>>> <artifactId>storm-kafka</artifactId> >>>>>> <groupId>org.apache.storm</groupId> >>>>>> <version>0.9.2-incubating</version> >>>>>> <scope>*compile*</scope> >>>>>> </dependency> >>>>>> >>>>>> I can mvn package it, but when I run it >>>>>> root@DO-mq-dev:/home/stuser/kafkaprj/kafka-storm-bitmap# storm jar >>>>>> target/kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar >>>>>> storm.artemis.KafkaConsumerTopology KafkaConsumerTopology >>>>>> >>>>>> >>>>>> I am getting such error >>>>>> >>>>>> 1657 [main] INFO >>>>>> com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting >>>>>> 1682 [main] INFO backtype.storm.daemon.supervisor - Starting supervisor >>>>>> with id a66e0c61-a951-4c1b-a43f-3fb0d12cb226 at host DO-mq-dev >>>>>> 1698 [main] ERROR org.apache.zookeeper.server.NIOServerCnxn - Thread >>>>>> Thread[main,5,main] died >>>>>> java.lang.NoClassDefFoundError: kafka/api/OffsetRequest >>>>>> at storm.artemis.kafka.KafkaConfig.<init>(KafkaConfig.java:26) >>>>>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >>>>>> at >>>>>> storm.artemis.kafka.trident.TridentKafkaConfig.<init>(TridentKafkaConfig.java:13) >>>>>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >>>>>> at >>>>>> storm.artemis.KafkaConsumerTopology.buildTopology(KafkaConsumerTopology.java:115) >>>>>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >>>>>> at >>>>>> storm.artemis.KafkaConsumerTopology.main(KafkaConsumerTopology.java:144) >>>>>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >>>>>> Caused by: java.lang.ClassNotFoundException: kafka.api.OffsetRequest >>>>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:366) ~[na:1.7.0_55] >>>>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:355) ~[na:1.7.0_55] >>>>>> at java.security.AccessController.doPrivileged(Native Method) >>>>>> ~[na:1.7.0_55] >>>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:354) >>>>>> ~[na:1.7.0_55] >>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:425) ~[na:1.7.0_55] >>>>>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) >>>>>> ~[na:1.7.0_55] >>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ~[na:1.7.0_55] >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> I try to poke around online, could not find a solution for it, any idea >>>>>> about that? >>>>>> >>>>>> >>>>>> Thanks >>>>>> >>>>>> Alec >>>>>> >>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> > -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.