Can you set zkPort in SpoutConfig to 2181 in your topology builder and see if that helps?
-- Kushan Maskey 817.403.7500 On Wed, Aug 6, 2014 at 2:34 PM, Sa Li <sa.in.v...@gmail.com> wrote: > Hi, Kushan > > You are completely right, I noticed this after you mentioned it, > apparently I am able to consumer the messages by kafka-console-consumer.sh > which listen to 2181, but storm goes to 2000 instead. > > 1319 [main] INFO backtype.storm.zookeeper - Starting inprocess zookeeper > at port 2000 and dir /tmp/f41ad971-9f6b-433f-9dc9-9797afcc2e46 > 1425 [main] INFO backtype.storm.daemon.nimbus - Starting Nimbus with conf > {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", > > I spent whole morning to walk through my configuration, this is the zoo.cfg > > # The number of milliseconds of each tick > tickTime=2000 > # The number of ticks that the initial > # synchronization phase can take > initLimit=5 > # The number of ticks that can pass between > # sending a request and getting an acknowledgement > syncLimit=2 > # the directory where the snapshot is stored. > dataDir=/var/lib/zookeeper > # Place the dataLogDir to a separate physical disc for better performance > # dataLogDir=/disk2/zookeeper > # the port at which the clients will connect > clientPort=2181 > # specify all zookeeper servers > # The fist port is used by followers to connect to the leader > # The second one is used for leader election > #server.1=zookeeper1:2888:3888 > #server.2=zookeeper2:2888:3888 > #server.3=zookeeper3:2888:3888 > > # To avoid seeks ZooKeeper allocates space in the transaction log file in > # blocks of preAllocSize kilobytes. The default block size is 64M. One > reason > # for changing the size of the blocks is to reduce the block size if > snapshots > # are taken more often. (Also, see snapCount). > #preAllocSize=65536 > # Clients can submit requests faster than ZooKeeper can process them, > # especially if there are a lot of clients. To prevent ZooKeeper from > running > # out of memory due to queued requests, ZooKeeper will throttle clients so > that > # there is no more than globalOutstandingLimit outstanding requests in the > # system. The default limit is 1,000.ZooKeeper logs transactions to a > # transaction log. After snapCount transactions are written to a log file a > # snapshot is started and a new transaction log file is started. The > default > # snapCount is 10,000. > #snapCount=1000 > > # If this option is defined, requests will be will logged to a trace file > named > # traceFile.year.month.day. > #traceFile= > # Leader accepts client connections. Default value is "yes". The leader > machine > # coordinates updates. For higher update throughput at thes slight expense > of > # read throughput the leader can be configured to not accept clients and > focus > # on coordination. > leaderServes=yes > # Enable regular purging of old data and transaction logs every 24 hours > autopurge.purgeInterval=24 > autopurge.snapRetainCount=5 > > Only thing that I thought to change was to make "multi-server" setup, > uncomment the server.1, server.2, server.3, but didn't help. And this is > the storm.yaml sitting in ~/.storm > > storm.zookeeper.servers: > - "10.100.70.128" > # - "server2" > storm.zookeeper.port: 2181 > nimbus.host: "10.100.70.128" > nimbus.childopts: "-Xmx1024m" > storm.local.dir: "/app/storm" > java.library.path: "/usr/lib/jvm/java-7-openjdk-amd64" > supervisor.slots.ports: > - 6700 > - 6701 > - 6702 > - 6703 > # ##### These may optionally be filled in: > # > ## List of custom serializations > # topology.kryo.register: > # - org.mycompany.MyType > # - org.mycompany.MyType2: org.mycompany.MyType2Serializer > # > ## List of custom kryo decorators > # topology.kryo.decorators: > # - org.mycompany.MyDecorator > # > ## Locations of the drpc servers > drpc.servers: > - "10.100.70.128" > # - "server2" > drpc.port: 3772 > drpc.worker.threads: 64 > drpc.queue.size: 128 > drpc.invocations.port: 3773 > drpc.request.timeout.secs: 600 > drpc.childopts: "-Xmx768m" > ## Metrics Consumers > # topology.metrics.consumer.register: > # - class: "backtype.storm.metrics.LoggingMetricsConsumer" > # parallelism.hint: 1 > # - class: "org.mycompany.MyMetricsConsumer" > # parallelism.hint: 1 > # argument: > # - endpoint: "metrics-collector.mycompany.org" > > I really couldn't figure out what is trick to configure zK and storm > cluster, and why zookeeper listen to 2000 which is really a weird thing. > > thanks > > Alec > > > > On Wed, Aug 6, 2014 at 6:48 AM, Kushan Maskey < > kushan.mas...@mmillerassociates.com> wrote: > >> I see that your zookeeper is listening on port 2000. Is that how you have >> configured the zookeeper? >> >> -- >> Kushan Maskey >> 817.403.7500 >> >> >> On Tue, Aug 5, 2014 at 11:56 AM, Sa Li <sa.in.v...@gmail.com> wrote: >> >>> Thank you very much, Marcelo, it indeed worked, now I can run my code >>> without getting error. However, another thing is keeping bother me, >>> following is my code: >>> >>> public static class PrintStream implements Filter { >>> >>> @SuppressWarnings("rawtypesā€¯) >>> @Override >>> public void prepare(Map conf, TridentOperationContext context) { >>> } >>> @Override >>> public void cleanup() { >>> } >>> @Override >>> public boolean isKeep(TridentTuple tuple) { >>> System.out.println(tuple); >>> return true; >>> } >>> } >>> public static StormTopology buildTopology(LocalDRPC drpc) throws >>> IOException >>> { >>> >>> TridentTopology topology = new TridentTopology(); >>> BrokerHosts zk = new ZkHosts("localhost"); >>> TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, >>> "ingest_test"); >>> spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme()); >>> OpaqueTridentKafkaSpout spout = new >>> OpaqueTridentKafkaSpout(spoutConf); >>> >>> topology.newStream("kafka", spout) >>> .each(new Fields("str"), >>> new PrintStream() >>> ); >>> >>> return topology.build(); >>> } >>> public static void main(String[] args) throws Exception { >>> >>> Config conf = new Config(); >>> conf.setDebug(true); >>> conf.setMaxSpoutPending(1); >>> conf.setMaxTaskParallelism(3); >>> LocalDRPC drpc = new LocalDRPC(); >>> LocalCluster cluster = new LocalCluster(); >>> cluster.submitTopology("kafka", conf, buildTopology(drpc)); >>> >>> Thread.sleep(100); >>> cluster.shutdown(); >>> } >>> >>> What I expect is quite simple, print out the message I collect from a >>> kafka producer playback process which is running separately. The topic is >>> listed as: >>> >>> root@DO-mq-dev:/etc/kafka# bin/kafka-list-topic.sh --zookeeper >>> localhost:2181 >>> topic: topictest partition: 0 leader: 1 replicas: 1,3,2 >>> isr: 1,3,2 >>> topic: topictest partition: 1 leader: 2 replicas: 2,1,3 >>> isr: 2,1,3 >>> topic: topictest partition: 2 leader: 3 replicas: 3,2,1 >>> isr: 3,2,1 >>> topic: topictest partition: 3 leader: 1 replicas: 1,2,3 >>> isr: 1,2,3 >>> topic: topictest partition: 4 leader: 2 replicas: 2,3,1 >>> isr: 2,3,1 >>> >>> When I am running the code, this is what I saw on the screen, seems no >>> error, but no message print out as well: >>> >>> SLF4J: Class path contains multiple SLF4J bindings. >>> SLF4J: Found binding in >>> [jar:file:/etc/storm-0.9.0.1/lib/logback-classic-1.0.6.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>> SLF4J: Found binding in >>> [jar:file:/home/stuser/kafkaprj/kafka-storm-bitmap/target/kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an >>> explanation. >>> Running: java -client -Dstorm.options= -Dstorm.home=/etc/storm-0.9.0.1 >>> -Djava.library.path=/usr/lib/jvm/java-7-openjdk-amd64 -Dstorm.conf.file= >>> -cp >>> /etc/storm-0.9.0.1/storm-netty-0.9.0.1.jar:/etc/storm-0.9.0.1/storm-core-0.9.0.1.jar:/etc/storm-0.9.0.1/storm-console-logging-0.9.0.1.jar:/etc/storm-0.9.0.1/lib/log4j-over-slf4j-1.6.6.jar:/etc/storm-0.9.0.1/lib/commons-io-1.4.jar:/etc/storm-0.9.0.1/lib/joda-time-2.0.jar:/etc/storm-0.9.0.1/lib/tools.nrepl-0.2.3.jar:/etc/storm-0.9.0.1/lib/servlet-api-2.5.jar:/etc/storm-0.9.0.1/lib/curator-framework-1.0.1.jar:/etc/storm-0.9.0.1/lib/core.incubator-0.1.0.jar:/etc/storm-0.9.0.1/lib/jetty-6.1.26.jar:/etc/storm-0.9.0.1/lib/commons-codec-1.4.jar:/etc/storm-0.9.0.1/lib/servlet-api-2.5-20081211.jar:/etc/storm-0.9.0.1/lib/httpclient-4.1.1.jar:/etc/storm-0.9.0.1/lib/commons-exec-1.1.jar:/etc/storm-0.9.0.1/lib/logback-classic-1.0.6.jar:/etc/storm-0.9.0.1/lib/libthrift7-0.7.0-2.jar:/etc/storm-0.9.0.1/lib/minlog-1.2.jar:/etc/storm-0.9.0.1/lib/clojure-complete-0.2.3.jar:/etc/storm-0.9.0.1/lib/clojure-1.4.0.jar:/etc/storm-0.9.0.1/lib/asm-4.0.jar:/etc/storm-0.9.0.1/lib/mockito-all-1.9.5.jar:/etc/storm-0.9.0.1/lib/commons-fileupload-1.2.1.jar:/etc/storm-0.9.0.1/lib/clout-1.0.1.jar:/etc/storm-0.9.0.1/lib/ring-servlet-0.3.11.jar:/etc/storm-0.9.0.1/lib/ring-devel-0.3.11.jar:/etc/storm-0.9.0.1/lib/jgrapht-0.8.3.jar:/etc/storm-0.9.0.1/lib/snakeyaml-1.11.jar:/etc/storm-0.9.0.1/lib/reflectasm-1.07-shaded.jar:/etc/storm-0.9.0.1/lib/kryo-2.17.jar:/etc/storm-0.9.0.1/lib/ring-jetty-adapter-0.3.11.jar:/etc/storm-0.9.0.1/lib/compojure-1.1.3.jar:/etc/storm-0.9.0.1/lib/objenesis-1.2.jar:/etc/storm-0.9.0.1/lib/commons-logging-1.1.1.jar:/etc/storm-0.9.0.1/lib/tools.macro-0.1.0.jar:/etc/storm-0.9.0.1/lib/junit-3.8.1.jar:/etc/storm-0.9.0.1/lib/json-simple-1.1.jar:/etc/storm-0.9.0.1/lib/tools.cli-0.2.2.jar:/etc/storm-0.9.0.1/lib/curator-client-1.0.1.jar:/etc/storm-0.9.0.1/lib/jline-0.9.94.jar:/etc/storm-0.9.0.1/lib/zookeeper-3.3.3.jar:/etc/storm-0.9.0.1/lib/guava-13.0.jar:/etc/storm-0.9.0.1/lib/commons-lang-2.5.jar:/etc/storm-0.9.0.1/lib/carbonite-1.5.0.jar:/etc/storm-0.9.0.1/lib/ring-core-1.1.5.jar:/etc/storm-0.9.0.1/lib/jzmq-2.1.0.jar:/etc/storm-0.9.0.1/lib/hiccup-0.3.6.jar:/etc/storm-0.9.0.1/lib/tools.logging-0.2.3.jar:/etc/storm-0.9.0.1/lib/kafka_2.9.2-0.8.0.jar:/etc/storm-0.9.0.1/lib/clj-stacktrace-0.2.2.jar:/etc/storm-0.9.0.1/lib/math.numeric-tower-0.0.1.jar:/etc/storm-0.9.0.1/lib/slf4j-api-1.6.5.jar:/etc/storm-0.9.0.1/lib/netty-3.6.3.Final.jar:/etc/storm-0.9.0.1/lib/disruptor-2.10.1.jar:/etc/storm-0.9.0.1/lib/jetty-util-6.1.26.jar:/etc/storm-0.9.0.1/lib/httpcore-4.1.jar:/etc/storm-0.9.0.1/lib/logback-core-1.0.6.jar:/etc/storm-0.9.0.1/lib/clj-time-0.4.1.jar:target/kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:/etc/storm-0.9.0.1/conf:/etc/storm-0.9.0.1/bin >>> -Dstorm.jar=target/kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar >>> storm.artemis.KafkaConsumerTopology KafkaConsumerTopology >>> SLF4J: Class path contains multiple SLF4J bindings. >>> SLF4J: Found binding in >>> [jar:file:/etc/storm-0.9.0.1/lib/logback-classic-1.0.6.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>> SLF4J: Found binding in >>> [jar:file:/home/stuser/kafkaprj/kafka-storm-bitmap/target/kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an >>> explanation. >>> 1113 [main] INFO backtype.storm.zookeeper - Starting inprocess >>> zookeeper at port 2000 and dir /tmp/dd37d0cc-79b3-4f23-b6a5-3bcf5a9f0879 >>> 1216 [main] INFO backtype.storm.daemon.nimbus - Starting Nimbus with >>> conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", >>> "topology.tick.tuple.freq.secs" nil, >>> "topology.builtin.metrics.bucket.size.secs" 60, >>> "topology.fall.back.on.java.serialization" true, >>> "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, >>> "topology.skip.missing.kryo.registrations" true, >>> "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", >>> "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, >>> "topology.trident.batch.emit.interval.millis" 50, >>> "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", >>> "java.library.path" "/usr/lib/jvm/java-7-openjdk-amd64", >>> "topology.executor.send.buffer.size" 1024, "storm.local.dir" >>> "/tmp/cf44f174-2cda-4e67-8c85-e9f96897fcd9", >>> "storm.messaging.netty.buffer_size" 5242880, >>> "supervisor.worker.start.timeout.secs" 120, >>> "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" >>> 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, >>> "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "10.100.70.128", >>> "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2000, >>> "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" >>> 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" >>> "/storm", "storm.zookeeper.retry.intervalceiling.millis" 30000, >>> "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, >>> "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" >>> "/transactional", "topology.acker.executors" nil, >>> "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, >>> "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", >>> "supervisor.heartbeat.frequency.secs" 5, >>> "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, >>> "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", >>> "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, >>> "topology.tasks" nil, "storm.messaging.netty.max_retries" 30, >>> "topology.spout.wait.strategy" >>> "backtype.storm.spout.SleepSpoutWaitStrategy", "topology.max.spout.pending" >>> nil, "storm.zookeeper.retry.interval" 1000, " >>> topology.sleep.spout.wait.strategy.time.ms" 1, >>> "nimbus.topology.validator" >>> "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" >>> [6700 6701 6702 6703], "topology.debug" false, "nimbus.task.launch.secs" >>> 120, "nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" >>> 30, "task.refresh.poll.secs" 10, "topology.workers" 1, >>> "supervisor.childopts" "-Xmx256m", "nimbus.thrift.port" 6627, >>> "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, >>> "topology.tuple.serializer" >>> "backtype.storm.serialization.types.ListDelegateSerializer", >>> "topology.disruptor.wait.strategy" >>> "com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs" 30, >>> "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" >>> "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" >>> 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times" >>> 5, "storm.thrift.transport" >>> "backtype.storm.security.auth.SimpleTransportPlugin", >>> "topology.state.synchronization.timeout.secs" 60, >>> "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" >>> 600, "storm.messaging.transport" "backtype.storm.messaging.zmq", " >>> logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" >>> 1000, "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, >>> "ui.port" 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" >>> "local", "topology.optimize" true, "topology.max.task.parallelism" nil} >>> 1219 [main] INFO backtype.storm.daemon.nimbus - Using default scheduler >>> 1237 [main] INFO >>> com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting >>> 1303 [main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state >>> update: :connected:none >>> 1350 [main] INFO >>> com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting >>> 1417 [main] INFO >>> com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting >>> 1432 [main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state >>> update: :connected:none >>> 1482 [main] INFO >>> com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting >>> 1484 [main] INFO >>> com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting >>> 1532 [main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state >>> update: :connected:none >>> 1540 [main] INFO >>> com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting >>> 1568 [main] INFO backtype.storm.daemon.supervisor - Starting Supervisor >>> with conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", >>> "topology.tick.tuple.freq.secs" nil, >>> "topology.builtin.metrics.bucket.size.secs" 60, >>> "topology.fall.back.on.java.serialization" true, >>> "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, >>> "topology.skip.missing.kryo.registrations" true, >>> "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", >>> "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, >>> "topology.trident.batch.emit.interval.millis" 50, >>> "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", >>> "java.library.path" "/usr/lib/jvm/java-7-openjdk-amd64", >>> "topology.executor.send.buffer.size" 1024, "storm.local.dir" >>> "/tmp/3e515769-ebf5-4085-a6bf-35f4ad8be388", >>> "storm.messaging.netty.buffer_size" 5242880, >>> "supervisor.worker.start.timeout.secs" 120, >>> "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" >>> 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, >>> "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "10.100.70.128", >>> "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2000, >>> "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" >>> 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" >>> "/storm", "storm.zookeeper.retry.intervalceiling.millis" 30000, >>> "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, >>> "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" >>> "/transactional", "topology.acker.executors" nil, >>> "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, >>> "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", >>> "supervisor.heartbeat.frequency.secs" 5, >>> "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, >>> "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", >>> "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, >>> "topology.tasks" nil, "storm.messaging.netty.max_retries" 30, >>> "topology.spout.wait.strategy" >>> "backtype.storm.spout.SleepSpoutWaitStrategy", "topology.max.spout.pending" >>> nil, "storm.zookeeper.retry.interval" 1000, " >>> topology.sleep.spout.wait.strategy.time.ms" 1, >>> "nimbus.topology.validator" >>> "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" >>> (1 2 3), "topology.debug" false, "nimbus.task.launch.secs" 120, >>> "nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 30, >>> "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" >>> "-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, >>> "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" >>> "backtype.storm.serialization.types.ListDelegateSerializer", >>> "topology.disruptor.wait.strategy" >>> "com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs" 30, >>> "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" >>> "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" >>> 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times" >>> 5, "storm.thrift.transport" >>> "backtype.storm.security.auth.SimpleTransportPlugin", >>> "topology.state.synchronization.timeout.secs" 60, >>> "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" >>> 600, "storm.messaging.transport" "backtype.storm.messaging.zmq", " >>> logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" >>> 1000, "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, >>> "ui.port" 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" >>> "local", "topology.optimize" true, "topology.max.task.parallelism" nil} >>> 1576 [main] INFO >>> com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting >>> 1582 [main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state >>> update: :connected:none >>> 1590 [main] INFO >>> com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting >>> 1632 [main] INFO backtype.storm.daemon.supervisor - Starting supervisor >>> with id 944e6152-ca58-4d2b-8325-94ac98f43995 at host DO-mq-dev >>> 1636 [main] INFO backtype.storm.daemon.supervisor - Starting Supervisor >>> with conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", >>> "topology.tick.tuple.freq.secs" nil, >>> "topology.builtin.metrics.bucket.size.secs" 60, >>> "topology.fall.back.on.java.serialization" true, >>> "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, >>> "topology.skip.missing.kryo.registrations" true, >>> "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", >>> "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, >>> "topology.trident.batch.emit.interval.millis" 50, >>> "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", >>> "java.library.path" "/usr/lib/jvm/java-7-openjdk-amd64", >>> "topology.executor.send.buffer.size" 1024, "storm.local.dir" >>> "/tmp/d0aeb5f4-0830-4efd-be7f-bc40d5b66912", >>> "storm.messaging.netty.buffer_size" 5242880, >>> "supervisor.worker.start.timeout.secs" 120, >>> "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" >>> 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, >>> "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "10.100.70.128", >>> "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2000, >>> "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" >>> 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" >>> "/storm", "storm.zookeeper.retry.intervalceiling.millis" 30000, >>> "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, >>> "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" >>> "/transactional", "topology.acker.executors" nil, >>> "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, >>> "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", >>> "supervisor.heartbeat.frequency.secs" 5, >>> "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, >>> "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", >>> "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, >>> "topology.tasks" nil, "storm.messaging.netty.max_retries" 30, >>> "topology.spout.wait.strategy" >>> "backtype.storm.spout.SleepSpoutWaitStrategy", "topology.max.spout.pending" >>> nil, "storm.zookeeper.retry.interval" 1000, " >>> topology.sleep.spout.wait.strategy.time.ms" 1, >>> "nimbus.topology.validator" >>> "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" >>> (4 5 6), "topology.debug" false, "nimbus.task.launch.secs" 120, >>> "nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 30, >>> "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" >>> "-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, >>> "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" >>> "backtype.storm.serialization.types.ListDelegateSerializer", >>> "topology.disruptor.wait.strategy" >>> "com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs" 30, >>> "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" >>> "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" >>> 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times" >>> 5, "storm.thrift.transport" >>> "backtype.storm.security.auth.SimpleTransportPlugin", >>> "topology.state.synchronization.timeout.secs" 60, >>> "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" >>> 600, "storm.messaging.transport" "backtype.storm.messaging.zmq", " >>> logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" >>> 1000, "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, >>> "ui.port" 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" >>> "local", "topology.optimize" true, "topology.max.task.parallelism" nil} >>> 1638 [main] INFO >>> com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting >>> 1648 [main-EventThread] INFO backtype.storm.zookeeper - Zookeeper state >>> update: :connected:none >>> 1690 [main] INFO >>> com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting >>> 1740 [main] INFO backtype.storm.daemon.supervisor - Starting supervisor >>> with id e8303ca7-9cc4-4551-8387-7559fc3c53fc at host DO-mq-dev >>> 1944 [main] INFO backtype.storm.daemon.nimbus - Received topology >>> submission for kafka with conf {"topology.max.task.parallelism" nil, >>> "topology.acker.executors" nil, "topology.kryo.register" >>> {"storm.trident.topology.TransactionAttempt" nil}, >>> "topology.kryo.decorators" (), "topology.name" "kafka", "storm.id" >>> "kafka-1-1407257070", "topology.debug" true} >>> 1962 [main] INFO backtype.storm.daemon.nimbus - Activating kafka: >>> kafka-1-1407257070 >>> 2067 [main] INFO backtype.storm.scheduler.EvenScheduler - Available >>> slots: (["944e6152-ca58-4d2b-8325-94ac98f43995" 1] >>> ["944e6152-ca58-4d2b-8325-94ac98f43995" 2] >>> ["944e6152-ca58-4d2b-8325-94ac98f43995" 3] >>> ["e8303ca7-9cc4-4551-8387-7559fc3c53fc" 4] >>> ["e8303ca7-9cc4-4551-8387-7559fc3c53fc" 5] >>> ["e8303ca7-9cc4-4551-8387-7559fc3c53fc" 6]) >>> 2088 [main] INFO backtype.storm.daemon.nimbus - Setting new assignment >>> for topology id kafka-1-1407257070: >>> #backtype.storm.daemon.common.Assignment{:master-code-dir >>> "/tmp/cf44f174-2cda-4e67-8c85-e9f96897fcd9/nimbus/stormdist/kafka-1-1407257070", >>> :node->host {"944e6152-ca58-4d2b-8325-94ac98f43995" "DO-mq-dev"}, >>> :executor->node+port {[3 3] ["944e6152-ca58-4d2b-8325-94ac98f43995" 1], [5 >>> 5] ["944e6152-ca58-4d2b-8325-94ac98f43995" 1], [4 4] >>> ["944e6152-ca58-4d2b-8325-94ac98f43995" 1], [2 2] >>> ["944e6152-ca58-4d2b-8325-94ac98f43995" 1], [1 1] >>> ["944e6152-ca58-4d2b-8325-94ac98f43995" 1]}, :executor->start-time-secs {[1 >>> 1] 1407257070, [2 2] 1407257070, [4 4] 1407257070, [5 5] 1407257070, [3 3] >>> 1407257070}} >>> 2215 [main] INFO backtype.storm.daemon.nimbus - Shutting down master >>> 2223 [main] INFO backtype.storm.daemon.nimbus - Shut down master >>> 2239 [main] INFO backtype.storm.daemon.supervisor - Shutting down >>> supervisor 944e6152-ca58-4d2b-8325-94ac98f43995 >>> 2240 [Thread-6] INFO backtype.storm.event - Event manager interrupted >>> 2241 [Thread-7] INFO backtype.storm.event - Event manager interrupted >>> 2248 [main] INFO backtype.storm.daemon.supervisor - Shutting down >>> supervisor e8303ca7-9cc4-4551-8387-7559fc3c53fc >>> 2248 [Thread-9] INFO backtype.storm.event - Event manager interrupted >>> 2248 [Thread-10] INFO backtype.storm.event - Event manager interrupted >>> 2256 [main] INFO backtype.storm.testing - Shutting down in process >>> zookeeper >>> 2257 [main] INFO backtype.storm.testing - Done shutting down in process >>> zookeeper >>> 2258 [main] INFO backtype.storm.testing - Deleting temporary path >>> /tmp/cf44f174-2cda-4e67-8c85-e9f96897fcd9 >>> 2259 [main] INFO backtype.storm.testing - Deleting temporary path >>> /tmp/dd37d0cc-79b3-4f23-b6a5-3bcf5a9f0879 >>> 2260 [main] INFO backtype.storm.testing - Deleting temporary path >>> /tmp/3e515769-ebf5-4085-a6bf-35f4ad8be388 >>> 2261 [main] INFO backtype.storm.testing - Deleting temporary path >>> /tmp/d0aeb5f4-0830-4efd-be7f-bc40d5b66912 >>> >>> Anyone can help me locate what the problem is? I really need to walk >>> through this step in order to be able to replace .each(printStream()) with >>> other functions. >>> >>> >>> Thanks >>> >>> Alec >>> >>> On Aug 4, 2014, at 4:24 AM, Marcelo Valle <mva...@redoop.org> wrote: >>> >>> hello, >>> >>> you can check your .jar application with command " jar tf " to see if >>> class kafka/api/OffsetRequest.class is part of the jar. >>> If not you can try to copy kafka-2.9.2-0.8.0.jar (or version you are >>> using) in storm_lib directory >>> >>> Marcelo >>> >>> >>> 2014-07-31 23:33 GMT+02:00 Sa Li <sa.in.v...@gmail.com>: >>> >>>> Hi, all >>>> >>>> I am running a kafka-spout code in storm-server, the pom is >>>> >>>> <groupId>org.apache.kafka</groupId> >>>> <artifactId>kafka_2.9.2</artifactId> >>>> <version>0.8.0</version> >>>> <scope>provided</scope> >>>> >>>> <exclusions> >>>> <exclusion> >>>> <groupId>org.apache.zookeeper</groupId> >>>> <artifactId>zookeeper</artifactId> >>>> </exclusion> >>>> <exclusion> >>>> <groupId>log4j</groupId> >>>> <artifactId>log4j</artifactId> >>>> </exclusion> >>>> </exclusions> >>>> >>>> </dependency> >>>> >>>> <!-- Storm-Kafka compiled --> >>>> >>>> <dependency> >>>> <artifactId>storm-kafka</artifactId> >>>> <groupId>org.apache.storm</groupId> >>>> <version>0.9.2-incubating</version> >>>> <scope>*compile*</scope> >>>> </dependency> >>>> >>>> I can mvn package it, but when I run it >>>> root@DO-mq-dev:/home/stuser/kafkaprj/kafka-storm-bitmap# storm jar >>>> target/kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar >>>> storm.artemis.KafkaConsumerTopology KafkaConsumerTopology >>>> >>>> >>>> I am getting such error >>>> >>>> 1657 [main] >>>> INFO com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting >>>> 1682 [main] INFO backtype.storm.daemon.supervisor - Starting >>>> supervisor with id a66e0c61-a951-4c1b-a43f-3fb0d12cb226 at host DO-mq-dev >>>> 1698 [main] ERROR org.apache.zookeeper.server.NIOServerCnxn - Thread >>>> Thread[main,5,main] died >>>> java.lang.NoClassDefFoundError: kafka/api/OffsetRequest >>>> at storm.artemis.kafka.KafkaConfig.<init>(KafkaConfig.java:26) >>>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >>>> at >>>> storm.artemis.kafka.trident.TridentKafkaConfig.<init>(TridentKafkaConfig.java:13) >>>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >>>> at >>>> storm.artemis.KafkaConsumerTopology.buildTopology(KafkaConsumerTopology.java:115) >>>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >>>> at >>>> storm.artemis.KafkaConsumerTopology.main(KafkaConsumerTopology.java:144) >>>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na] >>>> Caused by: java.lang.ClassNotFoundException: kafka.api.OffsetRequest >>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:366) >>>> ~[na:1.7.0_55] >>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:355) >>>> ~[na:1.7.0_55] >>>> at java.security.AccessController.doPrivileged(Native Method) >>>> ~[na:1.7.0_55] >>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:354) >>>> ~[na:1.7.0_55] >>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:425) >>>> ~[na:1.7.0_55] >>>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) >>>> ~[na:1.7.0_55] >>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:358) >>>> ~[na:1.7.0_55] >>>> >>>> >>>> >>>> >>>> I try to poke around online, could not find a solution for it, any idea >>>> about that? >>>> >>>> >>>> Thanks >>>> >>>> Alec >>>> >>>> >>>> >>>> >>> >>> >> >