Hi, thanks for reply , I did check the zkCli [zk: 127.0.0.1:2181(CONNECTED) 0] ls / [consumers, controller, brokers, zookeeper, controller_epoch] [zk: 127.0.0.1:2181(CONNECTED) 1] ls /brokers [topics, ids]
[zk: 127.0.0.1:2181(CONNECTED) 3] get /brokers null cZxid = 0x631 ctime = Thu Jul 17 20:48:15 PDT 2014 mZxid = 0x631 mtime = Thu Jul 17 20:48:15 PDT 2014 pZxid = 0x63f cversion = 2 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 0 numChildren = 2 The topic I created is ingest_topic, how I can find it from zkCli? I can user kafka-console-consumer to consume the messages Thanks Alec On Jul 30, 2014, at 11:12 AM, Parth Brahmbhatt <pbrahmbh...@hortonworks.com> wrote: > Hi, > > Nothing really wrong with your code. Can you confirm that zookeeper is indeed > running on localhost:2181 and can you log into zookeeper CLI (zkCli) and > check that "ls /" returns "brokers" directory under which you should be able > to find your topic. This is the default directory that ZkHosts.java looks for. > > Thanks > Parth > > On Tue, Jul 29, 2014 at 11:17 PM, Palak Shah <spala...@gmail.com> wrote: > Hi, > > I am using the Kafka spout that in integrated in > apache-storm-0.9.2-incubating release. I am able to submit the topology to my > storm cluster, but it is not receiving any tuples from the Kafka topic. I > know the topic ("page_visits") has data because I can read it from the > console. > > Here is my code for topology : > > public static void main(String[] args) throws AlreadyAliveException, > InvalidTopologyException { > BrokerHosts zkHost = new ZkHosts("localhost:2181"); > SpoutConfig spoutConfig = new SpoutConfig( > zkHost, // list of Kafka brokers > "page_visits", // topic to read from > "/zkroot", // the root path in Zookeeper for the spout to > store the consumer offsets > "discovery"); // an id for this consumer for storing the > consumer offsets in Zookeeper > spoutConfig.forceFromStart = true; > KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); > > TopologyBuilder builder = new TopologyBuilder(); > builder.setSpout("kafkaSpout", kafkaSpout); > builder.setBolt("kafkaBolt", new > PrinterBolt()).shuffleGrouping("kafkaSpout"); > > Config conf = new Config(); > conf.setNumWorkers(4); > conf.setDebug(true); > > StormSubmitter.submitTopology(args[0], conf, > builder.createTopology()); > > } > > } > > I am using apache-storm-0.9.2-incubating and kafka-0.9.2-0.8.1.1. Is this a > versions compatibility issue? if so, which version should I use for this to > work? > > Thanks in Advance, > Palak Shah > > > > -- > Thanks > Parth > > CONFIDENTIALITY NOTICE > NOTICE: This message is intended for the use of the individual or entity to > which it is addressed and may contain information that is confidential, > privileged and exempt from disclosure under applicable law. If the reader of > this message is not the intended recipient, you are hereby notified that any > printing, copying, dissemination, distribution, disclosure or forwarding of > this communication is strictly prohibited. If you have received this > communication in error, please contact the sender immediately and delete it > from your system. Thank You.