Hi, Kushan

You are completely right, I noticed this after you mentioned it, apparently
I am able to consumer the messages by kafka-console-consumer.sh which
listen to 2181, but storm goes to 2000 instead.

1319 [main] INFO  backtype.storm.zookeeper - Starting inprocess zookeeper
at port 2000 and dir /tmp/f41ad971-9f6b-433f-9dc9-9797afcc2e46
1425 [main] INFO  backtype.storm.daemon.nimbus - Starting Nimbus with conf
{"dev.zookeeper.path" "/tmp/dev-storm-zookeeper",

I spent whole morning to walk through my configuration, this is the zoo.cfg

# The number of milliseconds of each tick
# The number of ticks that the initial
# synchronization phase can take
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
# the directory where the snapshot is stored.
# Place the dataLogDir to a separate physical disc for better performance
# dataLogDir=/disk2/zookeeper
# the port at which the clients will connect
# specify all zookeeper servers
# The fist port is used by followers to connect to the leader
# The second one is used for leader election

# To avoid seeks ZooKeeper allocates space in the transaction log file in
# blocks of preAllocSize kilobytes. The default block size is 64M. One
# for changing the size of the blocks is to reduce the block size if
# are taken more often. (Also, see snapCount).
# Clients can submit requests faster than ZooKeeper can process them,
# especially if there are a lot of clients. To prevent ZooKeeper from
# out of memory due to queued requests, ZooKeeper will throttle clients so
# there is no more than globalOutstandingLimit outstanding requests in the
# system. The default limit is 1,000.ZooKeeper logs transactions to a
# transaction log. After snapCount transactions are written to a log file a
# snapshot is started and a new transaction log file is started. The default
# snapCount is 10,000.

# If this option is defined, requests will be will logged to a trace file
# traceFile.year.month.day.
# Leader accepts client connections. Default value is "yes". The leader
# coordinates updates. For higher update throughput at thes slight expense
# read throughput the leader can be configured to not accept clients and
# on coordination.
# Enable regular purging of old data and transaction logs every 24 hours

Only thing that I thought to change was to make "multi-server" setup,
uncomment the server.1, server.2, server.3, but didn't help. And this is
the storm.yaml sitting in ~/.storm

      - ""
#     - "server2"
storm.zookeeper.port: 2181
nimbus.host: ""
nimbus.childopts: "-Xmx1024m"
storm.local.dir: "/app/storm"
java.library.path: "/usr/lib/jvm/java-7-openjdk-amd64"
       - 6700
       - 6701
       - 6702
       - 6703
# ##### These may optionally be filled in:
## List of custom serializations
# topology.kryo.register:
#     - org.mycompany.MyType
#     - org.mycompany.MyType2: org.mycompany.MyType2Serializer
## List of custom kryo decorators
# topology.kryo.decorators:
#     - org.mycompany.MyDecorator
## Locations of the drpc servers
     - ""
#     - "server2"
drpc.port: 3772
drpc.worker.threads: 64
drpc.queue.size: 128
drpc.invocations.port: 3773
drpc.request.timeout.secs: 600
drpc.childopts: "-Xmx768m"
## Metrics Consumers
# topology.metrics.consumer.register:
#   - class: "backtype.storm.metrics.LoggingMetricsConsumer"
#     parallelism.hint: 1
#   - class: "org.mycompany.MyMetricsConsumer"
#     parallelism.hint: 1
#     argument:
#       - endpoint: "metrics-collector.mycompany.org"

I really couldn't figure out what is trick to configure zK and storm
cluster, and why zookeeper listen to 2000 which is really a weird thing.



On Wed, Aug 6, 2014 at 6:48 AM, Kushan Maskey
wrote:

> I see that your zookeeper is listening on port 2000. Is that how you have
> configured the zookeeper?
On Tue, Aug 5, 2014 at 11:56 AM, Sa Li <sa.in.v...@gmail.com> wrote:
>> Thank you very much, Marcelo, it indeed worked, now I can run my code
>> without getting error. However, another thing is keeping bother me,
>> following is my code:
>> public static class PrintStream implements Filter {
>>     @SuppressWarnings("rawtypesā€¯)
>>  @Override
>>     public void prepare(Map conf, TridentOperationContext context) {
>>      }
>>      @Override
>>     public void cleanup() {
>>      }
>>      @Override
>>     public boolean isKeep(TridentTuple tuple) {
>>      System.out.println(tuple);
>>      return true;
>>      }
>> }
>> public static StormTopology buildTopology(LocalDRPC drpc) throws IOException
>> {
>>             TridentTopology topology = new TridentTopology();
>>          BrokerHosts zk = new ZkHosts("localhost");
>>          TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk,
>> "ingest_test");
>>          spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
>>          OpaqueTridentKafkaSpout spout = new
>>  OpaqueTridentKafkaSpout(spoutConf);
>>                  topology.newStream("kafka", spout)
>>                                          .each(new Fields("str"),
>>                          new PrintStream()
>>                  );
>>                 return topology.build();
>> }
>> public static void main(String[] args) throws Exception {
>>        Config conf = new Config();
>>        conf.setDebug(true);
>>                     conf.setMaxSpoutPending(1);
>>             conf.setMaxTaskParallelism(3);
>>              LocalDRPC drpc = new LocalDRPC();
>>             LocalCluster cluster = new LocalCluster();
>>              cluster.submitTopology("kafka", conf, buildTopology(drpc));
>>              Thread.sleep(100);
>>              cluster.shutdown();
>> }
>> What I expect is quite simple, print out the message I collect from a
>> kafka producer playback process which is running separately. The topic is
>> listed as:
>> root@DO-mq-dev:/etc/kafka# bin/kafka-list-topic.sh --zookeeper
>> localhost:2181
>> topic: topictest        partition: 0    leader: 1       replicas: 1,3,2
>> isr: 1,3,2
>> topic: topictest        partition: 1    leader: 2       replicas: 2,1,3
>> isr: 2,1,3
>> topic: topictest        partition: 2    leader: 3       replicas: 3,2,1
>> isr: 3,2,1
>> topic: topictest        partition: 3    leader: 1       replicas: 1,2,3
>> isr: 1,2,3
>> topic: topictest        partition: 4    leader: 2       replicas: 2,3,1
>> isr: 2,3,1
>> When I am running the code, this is what I saw on the screen, seems no
>> error, but no message print out as well:
>> SLF4J: Class path contains multiple SLF4J bindings.
>> SLF4J: Found binding in
>> [jar:file:/etc/storm-!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: Found binding in
>> [jar:file:/home/stuser/kafkaprj/kafka-storm-bitmap/target/kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>> explanation.
>> Running: java -client -Dstorm.options= -Dstorm.home=/etc/storm-
>> -Djava.library.path=/usr/lib/jvm/java-7-openjdk-amd64 -Dstorm.conf.file=
>> -cp
>> /etc/storm-
>> -Dstorm.jar=target/kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar
>> storm.artemis.KafkaConsumerTopology KafkaConsumerTopology
>> SLF4J: Class path contains multiple SLF4J bindings.
>> SLF4J: Found binding in
>> [jar:file:/etc/storm-!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: Found binding in
>> [jar:file:/home/stuser/kafkaprj/kafka-storm-bitmap/target/kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>> explanation.
>> 1113 [main] INFO  backtype.storm.zookeeper - Starting inprocess zookeeper
>> at port 2000 and dir /tmp/dd37d0cc-79b3-4f23-b6a5-3bcf5a9f0879
>> 1216 [main] INFO  backtype.storm.daemon.nimbus - Starting Nimbus with
>> conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper",
>> "topology.tick.tuple.freq.secs" nil,
>> "topology.builtin.metrics.bucket.size.secs" 60,
>> "topology.fall.back.on.java.serialization" true,
>> "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0,
>> "topology.skip.missing.kryo.registrations" true,
>> "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m",
>> "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true,
>> "topology.trident.batch.emit.interval.millis" 50,
>> "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m",
>> "java.library.path" "/usr/lib/jvm/java-7-openjdk-amd64",
>> "topology.executor.send.buffer.size" 1024, "storm.local.dir"
>> "/tmp/cf44f174-2cda-4e67-8c85-e9f96897fcd9",
>> "storm.messaging.netty.buffer_size" 5242880,
>> "supervisor.worker.start.timeout.secs" 120,
>> "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs"
>> 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64,
>> "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "",
>> "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2000,
>> "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size"
>> 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root"
>> "/storm", "storm.zookeeper.retry.intervalceiling.millis" 30000,
>> "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1,
>> "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root"
>> "/transactional", "topology.acker.executors" nil,
>> "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil,
>> "drpc.queue.size" 128, "worker.childopts" "-Xmx768m",
>> "supervisor.heartbeat.frequency.secs" 5,
>> "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772,
>> "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m",
>> "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3,
>> "topology.tasks" nil, "storm.messaging.netty.max_retries" 30,
>> "topology.spout.wait.strategy"
>> "backtype.storm.spout.SleepSpoutWaitStrategy", "topology.max.spout.pending"
>> nil, "storm.zookeeper.retry.interval" 1000, "
>> topology.sleep.spout.wait.strategy.time.ms" 1,
>> "nimbus.topology.validator"
>> "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports"
>> [6700 6701 6702 6703], "topology.debug" false, "nimbus.task.launch.secs"
>> 120, "nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs"
>> 30, "task.refresh.poll.secs" 10, "topology.workers" 1,
>> "supervisor.childopts" "-Xmx256m", "nimbus.thrift.port" 6627,
>> "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1,
>> "topology.tuple.serializer"
>> "backtype.storm.serialization.types.ListDelegateSerializer",
>> "topology.disruptor.wait.strategy"
>> "com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs" 30,
>> "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory"
>> "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port"
>> 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times"
>> 5, "storm.thrift.transport"
>> "backtype.storm.security.auth.SimpleTransportPlugin",
>> "topology.state.synchronization.timeout.secs" 60,
>> "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs"
>> 600, "storm.messaging.transport" "backtype.storm.messaging.zmq", "
>> logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000,
>> "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port"
>> 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local",
>> "topology.optimize" true, "topology.max.task.parallelism" nil}
>> 1219 [main] INFO  backtype.storm.daemon.nimbus - Using default scheduler
>> 1237 [main] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl
>> - Starting
>> 1303 [main-EventThread] INFO  backtype.storm.zookeeper - Zookeeper state
>> update: :connected:none
>> 1350 [main] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl
>> - Starting
>> 1417 [main] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl
>> - Starting
>> 1432 [main-EventThread] INFO  backtype.storm.zookeeper - Zookeeper state
>> update: :connected:none
>> 1482 [main] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl
>> - Starting
>> 1484 [main] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl
>> - Starting
>> 1532 [main-EventThread] INFO  backtype.storm.zookeeper - Zookeeper state
>> update: :connected:none
>> 1540 [main] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl
>> - Starting
>> 1568 [main] INFO  backtype.storm.daemon.supervisor - Starting Supervisor
>> with conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper",
>> "topology.tick.tuple.freq.secs" nil,
>> "topology.builtin.metrics.bucket.size.secs" 60,
>> "topology.fall.back.on.java.serialization" true,
>> "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0,
>> "topology.skip.missing.kryo.registrations" true,
>> "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m",
>> "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true,
>> "topology.trident.batch.emit.interval.millis" 50,
>> "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m",
>> "java.library.path" "/usr/lib/jvm/java-7-openjdk-amd64",
>> "topology.executor.send.buffer.size" 1024, "storm.local.dir"
>> "/tmp/3e515769-ebf5-4085-a6bf-35f4ad8be388",
>> "storm.messaging.netty.buffer_size" 5242880,
>> "supervisor.worker.start.timeout.secs" 120,
>> "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs"
>> 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64,
>> "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "",
>> "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2000,
>> "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size"
>> 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root"
>> "/storm", "storm.zookeeper.retry.intervalceiling.millis" 30000,
>> "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1,
>> "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root"
>> "/transactional", "topology.acker.executors" nil,
>> "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil,
>> "drpc.queue.size" 128, "worker.childopts" "-Xmx768m",
>> "supervisor.heartbeat.frequency.secs" 5,
>> "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772,
>> "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m",
>> "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3,
>> "topology.tasks" nil, "storm.messaging.netty.max_retries" 30,
>> "topology.spout.wait.strategy"
>> "backtype.storm.spout.SleepSpoutWaitStrategy", "topology.max.spout.pending"
>> nil, "storm.zookeeper.retry.interval" 1000, "
>> topology.sleep.spout.wait.strategy.time.ms" 1,
>> "nimbus.topology.validator"
>> "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports"
>> (1 2 3), "topology.debug" false, "nimbus.task.launch.secs" 120,
>> "nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 30,
>> "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts"
>> "-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05,
>> "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer"
>> "backtype.storm.serialization.types.ListDelegateSerializer",
>> "topology.disruptor.wait.strategy"
>> "com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs" 30,
>> "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory"
>> "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port"
>> 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times"
>> 5, "storm.thrift.transport"
>> "backtype.storm.security.auth.SimpleTransportPlugin",
>> "topology.state.synchronization.timeout.secs" 60,
>> "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs"
>> 600, "storm.messaging.transport" "backtype.storm.messaging.zmq", "
>> logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000,
>> "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port"
>> 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local",
>> "topology.optimize" true, "topology.max.task.parallelism" nil}
>> 1576 [main] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl
>> - Starting
>> 1582 [main-EventThread] INFO  backtype.storm.zookeeper - Zookeeper state
>> update: :connected:none
>> 1590 [main] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl
>> - Starting
>> 1632 [main] INFO  backtype.storm.daemon.supervisor - Starting supervisor
>> with id 944e6152-ca58-4d2b-8325-94ac98f43995 at host DO-mq-dev
>> 1636 [main] INFO  backtype.storm.daemon.supervisor - Starting Supervisor
>> with conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper",
>> "topology.tick.tuple.freq.secs" nil,
>> "topology.builtin.metrics.bucket.size.secs" 60,
>> "topology.fall.back.on.java.serialization" true,
>> "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0,
>> "topology.skip.missing.kryo.registrations" true,
>> "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m",
>> "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true,
>> "topology.trident.batch.emit.interval.millis" 50,
>> "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m",
>> "java.library.path" "/usr/lib/jvm/java-7-openjdk-amd64",
>> "topology.executor.send.buffer.size" 1024, "storm.local.dir"
>> "/tmp/d0aeb5f4-0830-4efd-be7f-bc40d5b66912",
>> "storm.messaging.netty.buffer_size" 5242880,
>> "supervisor.worker.start.timeout.secs" 120,
>> "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs"
>> 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64,
>> "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "",
>> "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2000,
>> "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size"
>> 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root"
>> "/storm", "storm.zookeeper.retry.intervalceiling.millis" 30000,
>> "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1,
>> "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root"
>> "/transactional", "topology.acker.executors" nil,
>> "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil,
>> "drpc.queue.size" 128, "worker.childopts" "-Xmx768m",
>> "supervisor.heartbeat.frequency.secs" 5,
>> "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772,
>> "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m",
>> "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3,
>> "topology.tasks" nil, "storm.messaging.netty.max_retries" 30,
>> "topology.spout.wait.strategy"
>> "backtype.storm.spout.SleepSpoutWaitStrategy", "topology.max.spout.pending"
>> nil, "storm.zookeeper.retry.interval" 1000, "
>> topology.sleep.spout.wait.strategy.time.ms" 1,
>> "nimbus.topology.validator"
>> "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports"
>> (4 5 6), "topology.debug" false, "nimbus.task.launch.secs" 120,
>> "nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 30,
>> "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts"
>> "-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05,
>> "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer"
>> "backtype.storm.serialization.types.ListDelegateSerializer",
>> "topology.disruptor.wait.strategy"
>> "com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs" 30,
>> "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory"
>> "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port"
>> 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times"
>> 5, "storm.thrift.transport"
>> "backtype.storm.security.auth.SimpleTransportPlugin",
>> "topology.state.synchronization.timeout.secs" 60,
>> "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs"
>> 600, "storm.messaging.transport" "backtype.storm.messaging.zmq", "
>> logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000,
>> "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port"
>> 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local",
>> "topology.optimize" true, "topology.max.task.parallelism" nil}
>> 1638 [main] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl
>> - Starting
>> 1648 [main-EventThread] INFO  backtype.storm.zookeeper - Zookeeper state
>> update: :connected:none
>> 1690 [main] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl
>> - Starting
>> 1740 [main] INFO  backtype.storm.daemon.supervisor - Starting supervisor
>> with id e8303ca7-9cc4-4551-8387-7559fc3c53fc at host DO-mq-dev
>> 1944 [main] INFO  backtype.storm.daemon.nimbus - Received topology
>> submission for kafka with conf {"topology.max.task.parallelism" nil,
>> "topology.acker.executors" nil, "topology.kryo.register"
>> {"storm.trident.topology.TransactionAttempt" nil},
>> "topology.kryo.decorators" (), "topology.name" "kafka", "storm.id"
>> "kafka-1-1407257070", "topology.debug" true}
>> 1962 [main] INFO  backtype.storm.daemon.nimbus - Activating kafka:
>> kafka-1-1407257070
>> 2067 [main] INFO  backtype.storm.scheduler.EvenScheduler - Available
>> slots: (["944e6152-ca58-4d2b-8325-94ac98f43995" 1]
>> ["944e6152-ca58-4d2b-8325-94ac98f43995" 2]
>> ["944e6152-ca58-4d2b-8325-94ac98f43995" 3]
>> ["e8303ca7-9cc4-4551-8387-7559fc3c53fc" 4]
>> ["e8303ca7-9cc4-4551-8387-7559fc3c53fc" 5]
>> ["e8303ca7-9cc4-4551-8387-7559fc3c53fc" 6])
>> 2088 [main] INFO  backtype.storm.daemon.nimbus - Setting new assignment
>> for topology id kafka-1-1407257070:
>> #backtype.storm.daemon.common.Assignment{:master-code-dir
>> "/tmp/cf44f174-2cda-4e67-8c85-e9f96897fcd9/nimbus/stormdist/kafka-1-1407257070",
>> :node->host {"944e6152-ca58-4d2b-8325-94ac98f43995" "DO-mq-dev"},
>> :executor->node+port {[3 3] ["944e6152-ca58-4d2b-8325-94ac98f43995" 1], [5
>> 5] ["944e6152-ca58-4d2b-8325-94ac98f43995" 1], [4 4]
>> ["944e6152-ca58-4d2b-8325-94ac98f43995" 1], [2 2]
>> ["944e6152-ca58-4d2b-8325-94ac98f43995" 1], [1 1]
>> ["944e6152-ca58-4d2b-8325-94ac98f43995" 1]}, :executor->start-time-secs {[1
>> 1] 1407257070, [2 2] 1407257070, [4 4] 1407257070, [5 5] 1407257070, [3 3]
>> 1407257070}}
>> 2215 [main] INFO  backtype.storm.daemon.nimbus - Shutting down master
>> 2223 [main] INFO  backtype.storm.daemon.nimbus - Shut down master
>> 2239 [main] INFO  backtype.storm.daemon.supervisor - Shutting down
>> supervisor 944e6152-ca58-4d2b-8325-94ac98f43995
>> 2240 [Thread-6] INFO  backtype.storm.event - Event manager interrupted
>> 2241 [Thread-7] INFO  backtype.storm.event - Event manager interrupted
>> 2248 [main] INFO  backtype.storm.daemon.supervisor - Shutting down
>> supervisor e8303ca7-9cc4-4551-8387-7559fc3c53fc
>> 2248 [Thread-9] INFO  backtype.storm.event - Event manager interrupted
>> 2248 [Thread-10] INFO  backtype.storm.event - Event manager interrupted
>> 2256 [main] INFO  backtype.storm.testing - Shutting down in process
>> zookeeper
>> 2257 [main] INFO  backtype.storm.testing - Done shutting down in process
>> zookeeper
>> 2258 [main] INFO  backtype.storm.testing - Deleting temporary path
>> /tmp/cf44f174-2cda-4e67-8c85-e9f96897fcd9
>> 2259 [main] INFO  backtype.storm.testing - Deleting temporary path
>> /tmp/dd37d0cc-79b3-4f23-b6a5-3bcf5a9f0879
>> 2260 [main] INFO  backtype.storm.testing - Deleting temporary path
>> /tmp/3e515769-ebf5-4085-a6bf-35f4ad8be388
>> 2261 [main] INFO  backtype.storm.testing - Deleting temporary path
>> /tmp/d0aeb5f4-0830-4efd-be7f-bc40d5b66912
>> Anyone can help me locate what the problem is? I really need to walk
>> through this step in order to be able to replace .each(printStream()) with
>> other functions.
>> Thanks
>> Alec
On Aug 4, 2014, at 4:24 AM, Marcelo Valle <mva...@redoop.org> wrote:
>> hello,
>> you can check your .jar application with command " jar tf " to see if
>> class kafka/api/OffsetRequest.class is part of the jar.
>> If not you can try to copy kafka-2.9.2-0.8.0.jar (or version you are
>> using) in storm_lib directory
>> Marcelo
2014-07-31 23:33 GMT+02:00 Sa Li <sa.in.v...@gmail.com>:
>>> Hi, all
>>> I am running a kafka-spout code in storm-server, the pom is
>>>   <groupId>org.apache.kafka</groupId>
>>>         <artifactId>kafka_2.9.2</artifactId>
>>>         <version>0.8.0</version>
>>>                 <scope>provided</scope>
>>>                 <exclusions>
>>>                   <exclusion>
>>>                        <groupId>org.apache.zookeeper</groupId>
>>>                        <artifactId>zookeeper</artifactId>
>>>                   </exclusion>
>>>                   <exclusion>
>>>                        <groupId>log4j</groupId>
>>>                        <artifactId>log4j</artifactId>
>>>                   </exclusion>
>>>                 </exclusions>
>>>     </dependency>
>>>             <!-- Storm-Kafka compiled -->
>>>         <dependency>
>>>             <artifactId>storm-kafka</artifactId>
>>>             <groupId>org.apache.storm</groupId>
>>>             <version>0.9.2-incubating</version>
>>>             <scope>*compile*</scope>
>>>         </dependency>
>>> I can mvn package it, but when I run it
>>> root@DO-mq-dev:/home/stuser/kafkaprj/kafka-storm-bitmap# storm jar
>>> target/kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar
>>> storm.artemis.KafkaConsumerTopology KafkaConsumerTopology
>>> I am getting such error
>>> 1657 [main]
>>> INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
>>> 1682 [main] INFO  backtype.storm.daemon.supervisor - Starting supervisor
>>> with id a66e0c61-a951-4c1b-a43f-3fb0d12cb226 at host DO-mq-dev
>>> 1698 [main] ERROR org.apache.zookeeper.server.NIOServerCnxn - Thread
>>> Thread[main,5,main] died
>>> java.lang.NoClassDefFoundError: kafka/api/OffsetRequest
>>> at storm.artemis.kafka.KafkaConfig.<init>(KafkaConfig.java:26)
>>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>>  at
>>> storm.artemis.kafka.trident.TridentKafkaConfig.<init>(TridentKafkaConfig.java:13)
>>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>>  at
>>> storm.artemis.KafkaConsumerTopology.buildTopology(KafkaConsumerTopology.java:115)
>>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>>  at
>>> storm.artemis.KafkaConsumerTopology.main(KafkaConsumerTopology.java:144)
>>> ~[kafka-storm-bitmap-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>>> Caused by: java.lang.ClassNotFoundException: kafka.api.OffsetRequest
>>>  at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>>> ~[na:1.7.0_55]
>>>  at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>>> ~[na:1.7.0_55]
>>>  at java.security.AccessController.doPrivileged(Native Method)
>>> ~[na:1.7.0_55]
>>>  at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>>> ~[na:1.7.0_55]
>>>  at java.lang.ClassLoader.loadClass(ClassLoader.java:425) ~[na:1.7.0_55]
>>>  at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>>> ~[na:1.7.0_55]
>>>  at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ~[na:1.7.0_55]
>>> I try to poke around online, could not find a solution for it, any idea
>>> about that?
>>> Thanks
>>> Alec

