Bec I do see this error in the storm logs 5304 [ProcessThread(sid:0 cport:-1):] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x14d76b1ea54000c type:create cxid:0x5 zxid:0x2c txntype:-1 reqpath:n/a Error Path:/brokers/rawWarehousePriceSpout Error:KeeperErrorCode = NoNode for /brokers/rawWarehousePriceSpout
> On 21 May 2015, at 14:04, Cristian Makoto Sandiga <cmsand...@gmail.com> wrote: > > Are you wrong > > zookeeperPath , // the root path in Zookeeper for the spout to store the > consumer offsets > > Is for you KafkaSpout find information about broker meta information ( > topics, partitions) > > You have to override > > SpoutConfig spoutconfig = new SpoutConfig(zkhost, "bid_history", "/brokers", > "kafka-spout"); > spoutconfig.zkServers > spoutconfig.zkPort > > Take a look a KafkaSpout.class and do a debug mode in this part. > > 73 Map stateConf = new HashMap(conf); > 74 List<String> zkServers = _spoutConfig.zkServers; > if (zkServers == null) { > zkServers = (List<String>) > conf.get(Config.STORM_ZOOKEEPER_SERVERS); > } > Integer zkPort = _spoutConfig.zkPort; > if (zkPort == null) { > zkPort = ((Number) > conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue(); > } > > > > > 2015-05-21 9:53 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com > <mailto:cuthbert....@gmail.com>>: > So when I run it out of eclipse in local mode I am pointing to the same > zookeeper as I am when it is deployed. Whatever way it still does not write > the offset. > > Only different I have is my spout is configured like so: > > BrokerHosts hosts = new ZkHosts(zookeeperQuorum); > > String zookeeperPath = KAFKA_STORM_DIR + "/" + topic; > SpoutConfig spoutConfig = new SpoutConfig( > hosts, > topic, // topic to read from > zookeeperPath , // the root path in Zookeeper > for the spout to store the consumer offsets > spoutId); // an id for this consumer for > storing the consumer offsets in Zookeeper > > //Check if we should be consuming messages from the beginning > spoutConfig.forceFromStart = consumeFromBeginning; > spoutConfig.maxOffsetBehind = Long.MAX_VALUE; > spoutConfig.useStartOffsetTimeIfOffsetOutOfRange = true; > spoutConfig.scheme = new SchemeAsMultiScheme(new > KafkaAnalyticsMessageDecoder()); > > >> On 21 May 2015, at 13:50, Cristian Makoto Sandiga <cmsand...@gmail.com >> <mailto:cmsand...@gmail.com>> wrote: >> >> I have the same problem when i have Storm in Local cluster mode. but is >> because, Local mode has a zookeeper embedded and every time that restart >> the offset is null and is the reason that you have >> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_2 --> null. >> >> Try to run Local Mode, send some messages to spout, and then see Offset in >> Zookeeper (port 2000). >> >> Or use Zookeeper of kafka, override. >> >> spoutconfig.zkServers >> spoutconfig.zkPort >> >> >> 2015-05-21 9:39 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com >> <mailto:cuthbert....@gmail.com>>: >> Thanks, >> >> Yup kafka is creating them on startup when the topics gets its first msg. >> And from below the storm logs are never really committing the offset to >> zookeeper. >> >> Zookeeper kafka topics details >> >> [zk: 127.0.0.1:2181(CONNECTED) 41] ls >> /brokers/topics/warehouse_prices/partitions >> [2, 1, 0] >> [zk: 127.0.0.1:2181(CONNECTED) 42] >> >> Zookeeper Storm >> >> [zk: 127.0.0.1:2181(CONNECTED) 42] ls >> /kafkastorm/warehouse_prices/rawWarehousePriceSpout >> [] >> [zk: 127.0.0.1:2181(CONNECTED) 43] >> >> >> Storm logs >> >> 44325 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - >> Task [1/1] New partition managers: >> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >> partition=0}, >> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >> partition=1}, >> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >> partition=2}] >> 44491 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - Read partition information from: >> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_0 --> null >> 44746 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - No partition information found, using configuration to determine offset >> 44746 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - Last commit offset from zookeeper: 0 >> 44747 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - Commit offset 0 is more than 9223372036854775807 behind, resetting to >> startOffsetTime=-2 >> 44747 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - Starting Kafka price-engine-demo-server.c.celertech-01.internal:0 from >> offset 0 >> 44749 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - Read partition information from: >> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_1 --> null >> 44778 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - No partition information found, using configuration to determine offset >> 44778 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - Last commit offset from zookeeper: 0 >> 44778 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - Commit offset 0 is more than 9223372036854775807 behind, resetting to >> startOffsetTime=-2 >> 44779 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - Starting Kafka price-engine-demo-server.c.celertech-01.internal:1 from >> offset 0 >> 44781 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - Read partition information from: >> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_2 --> null >> 44809 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - No partition information found, using configuration to determine offset >> 44809 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - Last commit offset from zookeeper: 0 >> 44810 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - Commit offset 0 is more than 9223372036854775807 behind, resetting to >> startOffsetTime=-2 >> 44810 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - Starting Kafka price-engine-demo-server.c.celertech-01.internal:2 from >> offset 0 >> 44810 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - >> Task [1/1] Finished refreshing >> 47438 [ProcessThread(sid:0 cport:-1):] INFO >> org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level >> KeeperException when processing sessionid:0x14d7658db3c000c type:create >> cxid:0x5 zxid:0x2c txntype:-1 reqpath:n/a Error >> Path:/kafkastorm/warehouse_prices/rawWarehousePriceSpout >> Error:KeeperErrorCode = NoNode for >> /kafkastorm/warehouse_prices/rawWarehousePriceSpout >> 104184 [Thread-15-rawWarehousePriceSpout] WARN storm.kafka.KafkaUtils - No >> data found in Kafka Partition partition_0 >> 104828 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - >> Task [1/1] Refreshing partition manager connections >> 105005 [Thread-15-rawWarehousePriceSpout] INFO >> storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: >> GlobalPartitionInformation{partitionMap={0=price-engine-demo-server.c.celertech-01.internal:9092, >> 1=price-engine-demo-server.c.celertech-01.internal:9092, >> 2=price-engine-demo-server.c.celertech-01.internal:9092}} >> 105005 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.KafkaUtils - >> Task [1/1] assigned >> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >> partition=0}, >> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >> partition=1}, >> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >> partition=2}] >> 105006 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - >> Task [1/1] Deleted partition managers: [] >> 105006 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - >> Task [1/1] New partition managers: [] >> 105006 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - >> Task [1/1] Finished refreshing >> 164204 [Thread-15-rawWarehousePriceSpout] WARN storm.kafka.KafkaUtils - No >> data found in Kafka Partition partition_0 >> 165063 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - >> Task [1/1] Refreshing partition manager connections >> 165240 [Thread-15-rawWarehousePriceSpout] INFO >> storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: >> GlobalPartitionInformation{partitionMap={0=price-engine-demo-server.c.celertech-01.internal:9092, >> 1=price-engine-demo-server.c.celertech-01.internal:9092, >> 2=price-engine-demo-server.c.celertech-01.internal:9092}} >> 165240 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.KafkaUtils - >> Task [1/1] assigned >> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >> partition=0}, >> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >> partition=1}, >> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >> partition=2}] >> 165240 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - >> Task [1/1] Deleted partition managers: [] >> 165240 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - >> Task [1/1] New partition managers: [] >> 165240 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - >> Task [1/1] Finished refreshing >> 224233 [Thread-15-rawWarehousePriceSpout] WARN storm.kafka.KafkaUtils - No >> data found in Kafka Partition partition_0 >> 225308 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - >> Task [1/1] Refreshing partition manager connections >> >> >>> On 21 May 2015, at 13:28, Cristian Makoto Sandiga <cmsand...@gmail.com >>> <mailto:cmsand...@gmail.com>> wrote: >>> >>> Zookeeper create nothing when startup, you have to create your partitions >>> in kafka broker. >>> >>> bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic >>> click_history --replication-factor 1 --partitions 10 >>> >>> >>> >>> 2015-05-21 8:58 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com >>> <mailto:cuthbert....@gmail.com>>: >>> All, >>> >>> We changed or paths in zookeeper and we are now seeing >>> >>> java.lang.RuntimeException: java.lang.RuntimeException: >>> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = >>> NoNode for /brokers/topics/warehouse_prices/partitions >>> at >>> storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:81) >>> ~[storm-kafka-0.9.4.jar:0.9.4] >>> at storm.kafka.trident.ZkBrokerReader.<init>(ZkBrokerReader.java:42) >>> ~[storm-kafka-0.9.4.jar:0.9.4] >>> at storm.kafka.KafkaUtils.makeBrokerReader(KafkaUtils.java:57) >>> ~[storm-kafka-0.9.4.jar:0.9.4] >>> at storm.kafka.KafkaSpout.open(KafkaSpout.java:87) >>> ~[storm-kafka-0.9.4.jar:0.9.4] >>> at >>> backtype.storm.daemon.executor$fn__3371$fn__3386.invoke(executor.clj:522) >>> ~[storm-core-0.9.4.jar:0.9.4] >>> at backtype.storm.util$async_loop$fn__460.invoke(util.clj:461) >>> ~[storm-core-0.9.4.jar:0.9.4] >>> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] >>> at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51] >>> >>> Should this not be discovered by the Spout on startup? >>> >> >> > >