Thanks for the reply. I thought that the third param in SpoutConfig was for
// the root path in Zookeeper for the spout to store the consumer offsets So you are saying that it should be pointing to the directory that is created by kafka for each topic? As in: [zk: 127.0.0.1:2181(CONNECTED) 53] ls /brokers [topics, ids] [zk: 127.0.0.1:2181(CONNECTED) 54] ls /brokers/topics [applicationServerLogs, warehouse_prices, applicationServerFixLogs] [zk: 127.0.0.1:2181(CONNECTED) 55] ls /brokers/topics/warehouse_prices [partitions] [zk: 127.0.0.1:2181(CONNECTED) 56] ls /brokers/topics/warehouse_prices/partitions [2, 1, 0] [zk: 127.0.0.1:2181(CONNECTED) 57] ls /brokers/topics/warehouse_prices/partitions/1 [state] [zk: 127.0.0.1:2181(CONNECTED) 58] ls /brokers/topics/warehouse_prices/partitions/1/state [] [zk: 127.0.0.1:2181(CONNECTED) 59] Or should it be to a storm only path that stores the offsets? > On 21 May 2015, at 14:04, Cristian Makoto Sandiga <cmsand...@gmail.com> wrote: > > Are you wrong > > zookeeperPath , // the root path in Zookeeper for the spout to store the > consumer offsets > > Is for you KafkaSpout find information about broker meta information ( > topics, partitions) > > You have to override > > SpoutConfig spoutconfig = new SpoutConfig(zkhost, "bid_history", "/brokers", > "kafka-spout"); > spoutconfig.zkServers > spoutconfig.zkPort > > Take a look a KafkaSpout.class and do a debug mode in this part. > > 73 Map stateConf = new HashMap(conf); > 74 List<String> zkServers = _spoutConfig.zkServers; > if (zkServers == null) { > zkServers = (List<String>) > conf.get(Config.STORM_ZOOKEEPER_SERVERS); > } > Integer zkPort = _spoutConfig.zkPort; > if (zkPort == null) { > zkPort = ((Number) > conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue(); > } > > > > > 2015-05-21 9:53 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com > <mailto:cuthbert....@gmail.com>>: > So when I run it out of eclipse in local mode I am pointing to the same > zookeeper as I am when it is deployed. Whatever way it still does not write > the offset. > > Only different I have is my spout is configured like so: > > BrokerHosts hosts = new ZkHosts(zookeeperQuorum); > > String zookeeperPath = KAFKA_STORM_DIR + "/" + topic; > SpoutConfig spoutConfig = new SpoutConfig( > hosts, > topic, // topic to read from > zookeeperPath , // the root path in Zookeeper > for the spout to store the consumer offsets > spoutId); // an id for this consumer for > storing the consumer offsets in Zookeeper > > //Check if we should be consuming messages from the beginning > spoutConfig.forceFromStart = consumeFromBeginning; > spoutConfig.maxOffsetBehind = Long.MAX_VALUE; > spoutConfig.useStartOffsetTimeIfOffsetOutOfRange = true; > spoutConfig.scheme = new SchemeAsMultiScheme(new > KafkaAnalyticsMessageDecoder()); > > >> On 21 May 2015, at 13:50, Cristian Makoto Sandiga <cmsand...@gmail.com >> <mailto:cmsand...@gmail.com>> wrote: >> >> I have the same problem when i have Storm in Local cluster mode. but is >> because, Local mode has a zookeeper embedded and every time that restart >> the offset is null and is the reason that you have >> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_2 --> null. >> >> Try to run Local Mode, send some messages to spout, and then see Offset in >> Zookeeper (port 2000). >> >> Or use Zookeeper of kafka, override. >> >> spoutconfig.zkServers >> spoutconfig.zkPort >> >> >> 2015-05-21 9:39 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com >> <mailto:cuthbert....@gmail.com>>: >> Thanks, >> >> Yup kafka is creating them on startup when the topics gets its first msg. >> And from below the storm logs are never really committing the offset to >> zookeeper. >> >> Zookeeper kafka topics details >> >> [zk: 127.0.0.1:2181(CONNECTED) 41] ls >> /brokers/topics/warehouse_prices/partitions >> [2, 1, 0] >> [zk: 127.0.0.1:2181(CONNECTED) 42] >> >> Zookeeper Storm >> >> [zk: 127.0.0.1:2181(CONNECTED) 42] ls >> /kafkastorm/warehouse_prices/rawWarehousePriceSpout >> [] >> [zk: 127.0.0.1:2181(CONNECTED) 43] >> >> >> Storm logs >> >> 44325 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - >> Task [1/1] New partition managers: >> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >> partition=0}, >> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >> partition=1}, >> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >> partition=2}] >> 44491 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - Read partition information from: >> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_0 --> null >> 44746 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - No partition information found, using configuration to determine offset >> 44746 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - Last commit offset from zookeeper: 0 >> 44747 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - Commit offset 0 is more than 9223372036854775807 behind, resetting to >> startOffsetTime=-2 >> 44747 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - Starting Kafka price-engine-demo-server.c.celertech-01.internal:0 from >> offset 0 >> 44749 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - Read partition information from: >> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_1 --> null >> 44778 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - No partition information found, using configuration to determine offset >> 44778 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - Last commit offset from zookeeper: 0 >> 44778 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - Commit offset 0 is more than 9223372036854775807 behind, resetting to >> startOffsetTime=-2 >> 44779 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - Starting Kafka price-engine-demo-server.c.celertech-01.internal:1 from >> offset 0 >> 44781 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - Read partition information from: >> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_2 --> null >> 44809 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - No partition information found, using configuration to determine offset >> 44809 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - Last commit offset from zookeeper: 0 >> 44810 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - Commit offset 0 is more than 9223372036854775807 behind, resetting to >> startOffsetTime=-2 >> 44810 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - Starting Kafka price-engine-demo-server.c.celertech-01.internal:2 from >> offset 0 >> 44810 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - >> Task [1/1] Finished refreshing >> 47438 [ProcessThread(sid:0 cport:-1):] INFO >> org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level >> KeeperException when processing sessionid:0x14d7658db3c000c type:create >> cxid:0x5 zxid:0x2c txntype:-1 reqpath:n/a Error >> Path:/kafkastorm/warehouse_prices/rawWarehousePriceSpout >> Error:KeeperErrorCode = NoNode for >> /kafkastorm/warehouse_prices/rawWarehousePriceSpout >> 104184 [Thread-15-rawWarehousePriceSpout] WARN storm.kafka.KafkaUtils - No >> data found in Kafka Partition partition_0 >> 104828 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - >> Task [1/1] Refreshing partition manager connections >> 105005 [Thread-15-rawWarehousePriceSpout] INFO >> storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: >> GlobalPartitionInformation{partitionMap={0=price-engine-demo-server.c.celertech-01.internal:9092, >> 1=price-engine-demo-server.c.celertech-01.internal:9092, >> 2=price-engine-demo-server.c.celertech-01.internal:9092}} >> 105005 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.KafkaUtils - >> Task [1/1] assigned >> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >> partition=0}, >> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >> partition=1}, >> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >> partition=2}] >> 105006 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - >> Task [1/1] Deleted partition managers: [] >> 105006 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - >> Task [1/1] New partition managers: [] >> 105006 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - >> Task [1/1] Finished refreshing >> 164204 [Thread-15-rawWarehousePriceSpout] WARN storm.kafka.KafkaUtils - No >> data found in Kafka Partition partition_0 >> 165063 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - >> Task [1/1] Refreshing partition manager connections >> 165240 [Thread-15-rawWarehousePriceSpout] INFO >> storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: >> GlobalPartitionInformation{partitionMap={0=price-engine-demo-server.c.celertech-01.internal:9092, >> 1=price-engine-demo-server.c.celertech-01.internal:9092, >> 2=price-engine-demo-server.c.celertech-01.internal:9092}} >> 165240 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.KafkaUtils - >> Task [1/1] assigned >> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >> partition=0}, >> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >> partition=1}, >> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >> partition=2}] >> 165240 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - >> Task [1/1] Deleted partition managers: [] >> 165240 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - >> Task [1/1] New partition managers: [] >> 165240 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - >> Task [1/1] Finished refreshing >> 224233 [Thread-15-rawWarehousePriceSpout] WARN storm.kafka.KafkaUtils - No >> data found in Kafka Partition partition_0 >> 225308 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - >> Task [1/1] Refreshing partition manager connections >> >> >>> On 21 May 2015, at 13:28, Cristian Makoto Sandiga <cmsand...@gmail.com >>> <mailto:cmsand...@gmail.com>> wrote: >>> >>> Zookeeper create nothing when startup, you have to create your partitions >>> in kafka broker. >>> >>> bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic >>> click_history --replication-factor 1 --partitions 10 >>> >>> >>> >>> 2015-05-21 8:58 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com >>> <mailto:cuthbert....@gmail.com>>: >>> All, >>> >>> We changed or paths in zookeeper and we are now seeing >>> >>> java.lang.RuntimeException: java.lang.RuntimeException: >>> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = >>> NoNode for /brokers/topics/warehouse_prices/partitions >>> at >>> storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:81) >>> ~[storm-kafka-0.9.4.jar:0.9.4] >>> at storm.kafka.trident.ZkBrokerReader.<init>(ZkBrokerReader.java:42) >>> ~[storm-kafka-0.9.4.jar:0.9.4] >>> at storm.kafka.KafkaUtils.makeBrokerReader(KafkaUtils.java:57) >>> ~[storm-kafka-0.9.4.jar:0.9.4] >>> at storm.kafka.KafkaSpout.open(KafkaSpout.java:87) >>> ~[storm-kafka-0.9.4.jar:0.9.4] >>> at >>> backtype.storm.daemon.executor$fn__3371$fn__3386.invoke(executor.clj:522) >>> ~[storm-core-0.9.4.jar:0.9.4] >>> at backtype.storm.util$async_loop$fn__460.invoke(util.clj:461) >>> ~[storm-core-0.9.4.jar:0.9.4] >>> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] >>> at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51] >>> >>> Should this not be discovered by the Spout on startup? >>> >> >> > >