I am using BaseBasicBolt as that was told ack events automatically. Should we not use that?
> On 21 May 2015, at 15:24, Cristian Makoto Sandiga <cmsand...@gmail.com> wrote: > > Are you acking in your bolt? collector.ack(input); > > 2015-05-21 11:19 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com > <mailto:cuthbert....@gmail.com>>: > Okay so interesting: > > I have your configuration in my local eclipse and a remote zookeeper that > kafka is connected to: > > ZkHosts zkhost = new ZkHosts(“1.1.1.1:2181 > <http://1.1.1.1:2181/>","/brokers"); // /brokers -> kafka broker > SpoutConfig spoutconfig = new SpoutConfig(“1.1.1.1:2181 > <http://1.1.1.1:2181/>", “/prices", "/brokers", “rawWarehousePriceSpout"); > > Zookeeper: > > [zk: 127.0.0.1:2181(CONNECTED) 80] ls /brokers > [rawWarehousePriceSpout, topics, ids] > [zk: 127.0.0.1:2181(CONNECTED) 81] ls /brokers/rawWarehousePriceSpout > [] > [zk: 127.0.0.1:2181(CONNECTED) 82] > > > Logs i see: > > 41994 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.KafkaUtils - Task > [1/1] assigned > [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, > partition=0}, > Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, > partition=1}, > Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, > partition=2}] > 41994 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - > Task [1/1] Deleted partition managers: [] > 41994 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - > Task [1/1] New partition managers: > [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, > partition=0}, > Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, > partition=1}, > Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, > partition=2}] > 42094 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - > Read partition information from: /brokers/rawWarehousePriceSpout/partition_0 > --> null > 42363 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - > No partition information found, using configuration to determine offset > 42363 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - > Starting Kafka price-engine-demo-server.c.celertech-01.internal:0 from offset > 425530 > 42366 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - > Read partition information from: /brokers/rawWarehousePriceSpout/partition_1 > --> null > 42400 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - > No partition information found, using configuration to determine offset > 42400 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - > Starting Kafka price-engine-demo-server.c.celertech-01.internal:1 from offset > 550715 > 42401 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - > Read partition information from: /brokers/rawWarehousePriceSpout/partition_2 > --> null > 42432 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - > No partition information found, using configuration to determine offset > 42432 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - > Starting Kafka price-engine-demo-server.c.celertech-01.internal:2 from offset > 439547 > 42432 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - > Task [1/1] Finished refreshing > 44652 [ProcessThread(sid:0 cport:-1):] INFO > org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level > KeeperException when processing sessionid:0x14d76d4418d000c type:create > cxid:0x5 zxid:0x2c txntype:-1 reqpath:n/a Error > Path:/brokers/rawWarehousePriceSpout Error:KeeperErrorCode = NoNode for > /brokers/rawWarehousePriceSpout > > > >> On 21 May 2015, at 15:02, Cristian Makoto Sandiga <cmsand...@gmail.com >> <mailto:cmsand...@gmail.com>> wrote: >> >> Im sorry, i made a mistake >> >> ZkHosts zkhost = new ZkHosts("localhost:2181","/brokers"); // >> /brokers -> kafka broker >> SpoutConfig spoutconfig = new SpoutConfig(zkhost, >> "bid_history", "/brokers", "kafka-spout"); --> storm --> zookeeper >> >> If you see not is a ERROR, is a INFO usually when directory structure not >> exist. >> >> Take a look Zookeeper Kaka-broker (port 2181) >> ---------------------------------- >> >> -- bin/zookeeper-shell.sh localhost:2181 >> >> ls /brokers/topics/bid_history/partitions/0/state >> [] >> >> get /brokers/topics/bid_history/partitions/0/state >> {"controller_epoch":1,"leader":0,"version":1,"leader_epoch":0,"isr":[0]} >> cZxid = 0x113 >> ctime = Tue May 19 18:19:24 BRT 2015 >> mZxid = 0x113 >> mtime = Tue May 19 18:19:24 BRT 2015 >> pZxid = 0x113 >> cversion = 0 >> dataVersion = 0 >> aclVersion = 0 >> ephemeralOwner = 0x0 >> dataLength = 72 >> numChildren = 0 >> >> >> Take a look Zookeeper Storm-broker (port 2000) >> ---------------------------------- >> Start topology, send a message and then look: >> >> bin/zookeeper-shell.sh localhost:2000 >> Connecting to localhost:2000 >> Welcome to ZooKeeper! >> JLine support is disabled >> >> ls / >> [storm, brokers, zookeeper] >> ls /brokers >> [kafka-spout] >> ls /brokers/kafka-spout >> [partition_0] >> ls /brokers/kafka-spout/partition_0 >> [] >> get /brokers/kafka-spout/partition_0 >> {"topology":{"id":"a9be1962-6b4e-4ed4-ae68-155a1948a1f6","name":"consolidate_reports"},"offset":4426029,"partition":0,"broker":{"host":"localhost","port":9092},"topic":"bid_history"} >> cZxid = 0x50 >> ctime = Thu May 21 11:00:48 BRT 2015 >> mZxid = 0x50 >> mtime = Thu May 21 11:00:48 BRT 2015 >> pZxid = 0x50 >> cversion = 0 >> dataVersion = 0 >> aclVersion = 0 >> ephemeralOwner = 0x0 >> dataLength = 182 >> numChildren = 0 >> >> >> 2015-05-21 10:43 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com >> <mailto:cuthbert....@gmail.com>>: >> Bec I do see this error in the storm logs >> >> 5304 [ProcessThread(sid:0 cport:-1):] INFO >> org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level >> KeeperException when processing sessionid:0x14d76b1ea54000c type:create >> cxid:0x5 zxid:0x2c txntype:-1 reqpath:n/a Error >> Path:/brokers/rawWarehousePriceSpout Error:KeeperErrorCode = NoNode for >> /brokers/rawWarehousePriceSpout >> >> >> >>> On 21 May 2015, at 14:04, Cristian Makoto Sandiga <cmsand...@gmail.com >>> <mailto:cmsand...@gmail.com>> wrote: >>> >>> Are you wrong >>> >>> zookeeperPath , // the root path in Zookeeper for the spout to store the >>> consumer offsets >>> >>> Is for you KafkaSpout find information about broker meta information ( >>> topics, partitions) >>> >>> You have to override >>> >>> SpoutConfig spoutconfig = new SpoutConfig(zkhost, "bid_history", >>> "/brokers", "kafka-spout"); >>> spoutconfig.zkServers >>> spoutconfig.zkPort >>> >>> Take a look a KafkaSpout.class and do a debug mode in this part. >>> >>> 73 Map stateConf = new HashMap(conf); >>> 74 List<String> zkServers = _spoutConfig.zkServers; >>> if (zkServers == null) { >>> zkServers = (List<String>) >>> conf.get(Config.STORM_ZOOKEEPER_SERVERS); >>> } >>> Integer zkPort = _spoutConfig.zkPort; >>> if (zkPort == null) { >>> zkPort = ((Number) >>> conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue(); >>> } >>> >>> >>> >>> >>> 2015-05-21 9:53 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com >>> <mailto:cuthbert....@gmail.com>>: >>> So when I run it out of eclipse in local mode I am pointing to the same >>> zookeeper as I am when it is deployed. Whatever way it still does not write >>> the offset. >>> >>> Only different I have is my spout is configured like so: >>> >>> BrokerHosts hosts = new ZkHosts(zookeeperQuorum); >>> >>> String zookeeperPath = KAFKA_STORM_DIR + "/" + topic; >>> SpoutConfig spoutConfig = new SpoutConfig( >>> hosts, >>> topic, // topic to read from >>> zookeeperPath , // the root path in Zookeeper >>> for the spout to store the consumer offsets >>> spoutId); // an id for this consumer for >>> storing the consumer offsets in Zookeeper >>> >>> //Check if we should be consuming messages from the beginning >>> spoutConfig.forceFromStart = consumeFromBeginning; >>> spoutConfig.maxOffsetBehind = Long.MAX_VALUE; >>> spoutConfig.useStartOffsetTimeIfOffsetOutOfRange = true; >>> spoutConfig.scheme = new SchemeAsMultiScheme(new >>> KafkaAnalyticsMessageDecoder()); >>> >>> >>>> On 21 May 2015, at 13:50, Cristian Makoto Sandiga <cmsand...@gmail.com >>>> <mailto:cmsand...@gmail.com>> wrote: >>>> >>>> I have the same problem when i have Storm in Local cluster mode. but is >>>> because, Local mode has a zookeeper embedded and every time that restart >>>> the offset is null and is the reason that you have >>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_2 --> null. >>>> >>>> Try to run Local Mode, send some messages to spout, and then see Offset in >>>> Zookeeper (port 2000). >>>> >>>> Or use Zookeeper of kafka, override. >>>> >>>> spoutconfig.zkServers >>>> spoutconfig.zkPort >>>> >>>> >>>> 2015-05-21 9:39 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com >>>> <mailto:cuthbert....@gmail.com>>: >>>> Thanks, >>>> >>>> Yup kafka is creating them on startup when the topics gets its first msg. >>>> And from below the storm logs are never really committing the offset to >>>> zookeeper. >>>> >>>> Zookeeper kafka topics details >>>> >>>> [zk: 127.0.0.1:2181(CONNECTED) 41] ls >>>> /brokers/topics/warehouse_prices/partitions >>>> [2, 1, 0] >>>> [zk: 127.0.0.1:2181(CONNECTED) 42] >>>> >>>> Zookeeper Storm >>>> >>>> [zk: 127.0.0.1:2181(CONNECTED) 42] ls >>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout >>>> [] >>>> [zk: 127.0.0.1:2181(CONNECTED) 43] >>>> >>>> >>>> Storm logs >>>> >>>> 44325 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - >>>> Task [1/1] New partition managers: >>>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>>> partition=0}, >>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>>> partition=1}, >>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>>> partition=2}] >>>> 44491 [Thread-15-rawWarehousePriceSpout] INFO >>>> storm.kafka.PartitionManager - Read partition information from: >>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_0 --> null >>>> 44746 [Thread-15-rawWarehousePriceSpout] INFO >>>> storm.kafka.PartitionManager - No partition information found, using >>>> configuration to determine offset >>>> 44746 [Thread-15-rawWarehousePriceSpout] INFO >>>> storm.kafka.PartitionManager - Last commit offset from zookeeper: 0 >>>> 44747 [Thread-15-rawWarehousePriceSpout] INFO >>>> storm.kafka.PartitionManager - Commit offset 0 is more than >>>> 9223372036854775807 behind, resetting to startOffsetTime=-2 >>>> 44747 [Thread-15-rawWarehousePriceSpout] INFO >>>> storm.kafka.PartitionManager - Starting Kafka >>>> price-engine-demo-server.c.celertech-01.internal:0 from offset 0 >>>> 44749 [Thread-15-rawWarehousePriceSpout] INFO >>>> storm.kafka.PartitionManager - Read partition information from: >>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_1 --> null >>>> 44778 [Thread-15-rawWarehousePriceSpout] INFO >>>> storm.kafka.PartitionManager - No partition information found, using >>>> configuration to determine offset >>>> 44778 [Thread-15-rawWarehousePriceSpout] INFO >>>> storm.kafka.PartitionManager - Last commit offset from zookeeper: 0 >>>> 44778 [Thread-15-rawWarehousePriceSpout] INFO >>>> storm.kafka.PartitionManager - Commit offset 0 is more than >>>> 9223372036854775807 behind, resetting to startOffsetTime=-2 >>>> 44779 [Thread-15-rawWarehousePriceSpout] INFO >>>> storm.kafka.PartitionManager - Starting Kafka >>>> price-engine-demo-server.c.celertech-01.internal:1 from offset 0 >>>> 44781 [Thread-15-rawWarehousePriceSpout] INFO >>>> storm.kafka.PartitionManager - Read partition information from: >>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_2 --> null >>>> 44809 [Thread-15-rawWarehousePriceSpout] INFO >>>> storm.kafka.PartitionManager - No partition information found, using >>>> configuration to determine offset >>>> 44809 [Thread-15-rawWarehousePriceSpout] INFO >>>> storm.kafka.PartitionManager - Last commit offset from zookeeper: 0 >>>> 44810 [Thread-15-rawWarehousePriceSpout] INFO >>>> storm.kafka.PartitionManager - Commit offset 0 is more than >>>> 9223372036854775807 behind, resetting to startOffsetTime=-2 >>>> 44810 [Thread-15-rawWarehousePriceSpout] INFO >>>> storm.kafka.PartitionManager - Starting Kafka >>>> price-engine-demo-server.c.celertech-01.internal:2 from offset 0 >>>> 44810 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - >>>> Task [1/1] Finished refreshing >>>> 47438 [ProcessThread(sid:0 cport:-1):] INFO >>>> org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level >>>> KeeperException when processing sessionid:0x14d7658db3c000c type:create >>>> cxid:0x5 zxid:0x2c txntype:-1 reqpath:n/a Error >>>> Path:/kafkastorm/warehouse_prices/rawWarehousePriceSpout >>>> Error:KeeperErrorCode = NoNode for >>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout >>>> 104184 [Thread-15-rawWarehousePriceSpout] WARN storm.kafka.KafkaUtils - >>>> No data found in Kafka Partition partition_0 >>>> 104828 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator >>>> - Task [1/1] Refreshing partition manager connections >>>> 105005 [Thread-15-rawWarehousePriceSpout] INFO >>>> storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: >>>> GlobalPartitionInformation{partitionMap={0=price-engine-demo-server.c.celertech-01.internal:9092, >>>> 1=price-engine-demo-server.c.celertech-01.internal:9092, >>>> 2=price-engine-demo-server.c.celertech-01.internal:9092}} >>>> 105005 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.KafkaUtils - >>>> Task [1/1] assigned >>>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>>> partition=0}, >>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>>> partition=1}, >>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>>> partition=2}] >>>> 105006 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator >>>> - Task [1/1] Deleted partition managers: [] >>>> 105006 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator >>>> - Task [1/1] New partition managers: [] >>>> 105006 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator >>>> - Task [1/1] Finished refreshing >>>> 164204 [Thread-15-rawWarehousePriceSpout] WARN storm.kafka.KafkaUtils - >>>> No data found in Kafka Partition partition_0 >>>> 165063 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator >>>> - Task [1/1] Refreshing partition manager connections >>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO >>>> storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: >>>> GlobalPartitionInformation{partitionMap={0=price-engine-demo-server.c.celertech-01.internal:9092, >>>> 1=price-engine-demo-server.c.celertech-01.internal:9092, >>>> 2=price-engine-demo-server.c.celertech-01.internal:9092}} >>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.KafkaUtils - >>>> Task [1/1] assigned >>>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>>> partition=0}, >>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>>> partition=1}, >>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>>> partition=2}] >>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator >>>> - Task [1/1] Deleted partition managers: [] >>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator >>>> - Task [1/1] New partition managers: [] >>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator >>>> - Task [1/1] Finished refreshing >>>> 224233 [Thread-15-rawWarehousePriceSpout] WARN storm.kafka.KafkaUtils - >>>> No data found in Kafka Partition partition_0 >>>> 225308 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator >>>> - Task [1/1] Refreshing partition manager connections >>>> >>>> >>>>> On 21 May 2015, at 13:28, Cristian Makoto Sandiga <cmsand...@gmail.com >>>>> <mailto:cmsand...@gmail.com>> wrote: >>>>> >>>>> Zookeeper create nothing when startup, you have to create your partitions >>>>> in kafka broker. >>>>> >>>>> bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic >>>>> click_history --replication-factor 1 --partitions 10 >>>>> >>>>> >>>>> >>>>> 2015-05-21 8:58 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com >>>>> <mailto:cuthbert....@gmail.com>>: >>>>> All, >>>>> >>>>> We changed or paths in zookeeper and we are now seeing >>>>> >>>>> java.lang.RuntimeException: java.lang.RuntimeException: >>>>> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = >>>>> NoNode for /brokers/topics/warehouse_prices/partitions >>>>> at >>>>> storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:81) >>>>> ~[storm-kafka-0.9.4.jar:0.9.4] >>>>> at storm.kafka.trident.ZkBrokerReader.<init>(ZkBrokerReader.java:42) >>>>> ~[storm-kafka-0.9.4.jar:0.9.4] >>>>> at storm.kafka.KafkaUtils.makeBrokerReader(KafkaUtils.java:57) >>>>> ~[storm-kafka-0.9.4.jar:0.9.4] >>>>> at storm.kafka.KafkaSpout.open(KafkaSpout.java:87) >>>>> ~[storm-kafka-0.9.4.jar:0.9.4] >>>>> at >>>>> backtype.storm.daemon.executor$fn__3371$fn__3386.invoke(executor.clj:522) >>>>> ~[storm-core-0.9.4.jar:0.9.4] >>>>> at backtype.storm.util$async_loop$fn__460.invoke(util.clj:461) >>>>> ~[storm-core-0.9.4.jar:0.9.4] >>>>> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] >>>>> at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51] >>>>> >>>>> Should this not be discovered by the Spout on startup? >>>>> >>>> >>>> >>> >>> >> >> > >