So when I run it out of eclipse in local mode I am pointing to the same zookeeper as I am when it is deployed. Whatever way it still does not write the offset.
Only different I have is my spout is configured like so: BrokerHosts hosts = new ZkHosts(zookeeperQuorum); String zookeeperPath = KAFKA_STORM_DIR + "/" + topic; SpoutConfig spoutConfig = new SpoutConfig( hosts, topic, // topic to read from zookeeperPath , // the root path in Zookeeper for the spout to store the consumer offsets spoutId); // an id for this consumer for storing the consumer offsets in Zookeeper //Check if we should be consuming messages from the beginning spoutConfig.forceFromStart = consumeFromBeginning; spoutConfig.maxOffsetBehind = Long.MAX_VALUE; spoutConfig.useStartOffsetTimeIfOffsetOutOfRange = true; spoutConfig.scheme = new SchemeAsMultiScheme(new KafkaAnalyticsMessageDecoder()); > On 21 May 2015, at 13:50, Cristian Makoto Sandiga <cmsand...@gmail.com> wrote: > > I have the same problem when i have Storm in Local cluster mode. but is > because, Local mode has a zookeeper embedded and every time that restart the > offset is null and is the reason that you have > /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_2 --> null. > > Try to run Local Mode, send some messages to spout, and then see Offset in > Zookeeper (port 2000). > > Or use Zookeeper of kafka, override. > > spoutconfig.zkServers > spoutconfig.zkPort > > > 2015-05-21 9:39 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com > <mailto:cuthbert....@gmail.com>>: > Thanks, > > Yup kafka is creating them on startup when the topics gets its first msg. And > from below the storm logs are never really committing the offset to > zookeeper. > > Zookeeper kafka topics details > > [zk: 127.0.0.1:2181(CONNECTED) 41] ls > /brokers/topics/warehouse_prices/partitions > [2, 1, 0] > [zk: 127.0.0.1:2181(CONNECTED) 42] > > Zookeeper Storm > > [zk: 127.0.0.1:2181(CONNECTED) 42] ls > /kafkastorm/warehouse_prices/rawWarehousePriceSpout > [] > [zk: 127.0.0.1:2181(CONNECTED) 43] > > > Storm logs > > 44325 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - > Task [1/1] New partition managers: > [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, > partition=0}, > Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, > partition=1}, > Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, > partition=2}] > 44491 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - > Read partition information from: > /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_0 --> null > 44746 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - > No partition information found, using configuration to determine offset > 44746 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - > Last commit offset from zookeeper: 0 > 44747 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - > Commit offset 0 is more than 9223372036854775807 behind, resetting to > startOffsetTime=-2 > 44747 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - > Starting Kafka price-engine-demo-server.c.celertech-01.internal:0 from offset > 0 > 44749 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - > Read partition information from: > /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_1 --> null > 44778 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - > No partition information found, using configuration to determine offset > 44778 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - > Last commit offset from zookeeper: 0 > 44778 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - > Commit offset 0 is more than 9223372036854775807 behind, resetting to > startOffsetTime=-2 > 44779 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - > Starting Kafka price-engine-demo-server.c.celertech-01.internal:1 from offset > 0 > 44781 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - > Read partition information from: > /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_2 --> null > 44809 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - > No partition information found, using configuration to determine offset > 44809 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - > Last commit offset from zookeeper: 0 > 44810 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - > Commit offset 0 is more than 9223372036854775807 behind, resetting to > startOffsetTime=-2 > 44810 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - > Starting Kafka price-engine-demo-server.c.celertech-01.internal:2 from offset > 0 > 44810 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - > Task [1/1] Finished refreshing > 47438 [ProcessThread(sid:0 cport:-1):] INFO > org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level > KeeperException when processing sessionid:0x14d7658db3c000c type:create > cxid:0x5 zxid:0x2c txntype:-1 reqpath:n/a Error > Path:/kafkastorm/warehouse_prices/rawWarehousePriceSpout > Error:KeeperErrorCode = NoNode for > /kafkastorm/warehouse_prices/rawWarehousePriceSpout > 104184 [Thread-15-rawWarehousePriceSpout] WARN storm.kafka.KafkaUtils - No > data found in Kafka Partition partition_0 > 104828 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - > Task [1/1] Refreshing partition manager connections > 105005 [Thread-15-rawWarehousePriceSpout] INFO > storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: > GlobalPartitionInformation{partitionMap={0=price-engine-demo-server.c.celertech-01.internal:9092, > 1=price-engine-demo-server.c.celertech-01.internal:9092, > 2=price-engine-demo-server.c.celertech-01.internal:9092}} > 105005 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.KafkaUtils - Task > [1/1] assigned > [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, > partition=0}, > Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, > partition=1}, > Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, > partition=2}] > 105006 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - > Task [1/1] Deleted partition managers: [] > 105006 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - > Task [1/1] New partition managers: [] > 105006 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - > Task [1/1] Finished refreshing > 164204 [Thread-15-rawWarehousePriceSpout] WARN storm.kafka.KafkaUtils - No > data found in Kafka Partition partition_0 > 165063 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - > Task [1/1] Refreshing partition manager connections > 165240 [Thread-15-rawWarehousePriceSpout] INFO > storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: > GlobalPartitionInformation{partitionMap={0=price-engine-demo-server.c.celertech-01.internal:9092, > 1=price-engine-demo-server.c.celertech-01.internal:9092, > 2=price-engine-demo-server.c.celertech-01.internal:9092}} > 165240 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.KafkaUtils - Task > [1/1] assigned > [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, > partition=0}, > Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, > partition=1}, > Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, > partition=2}] > 165240 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - > Task [1/1] Deleted partition managers: [] > 165240 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - > Task [1/1] New partition managers: [] > 165240 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - > Task [1/1] Finished refreshing > 224233 [Thread-15-rawWarehousePriceSpout] WARN storm.kafka.KafkaUtils - No > data found in Kafka Partition partition_0 > 225308 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - > Task [1/1] Refreshing partition manager connections > > >> On 21 May 2015, at 13:28, Cristian Makoto Sandiga <cmsand...@gmail.com >> <mailto:cmsand...@gmail.com>> wrote: >> >> Zookeeper create nothing when startup, you have to create your partitions in >> kafka broker. >> >> bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic >> click_history --replication-factor 1 --partitions 10 >> >> >> >> 2015-05-21 8:58 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com >> <mailto:cuthbert....@gmail.com>>: >> All, >> >> We changed or paths in zookeeper and we are now seeing >> >> java.lang.RuntimeException: java.lang.RuntimeException: >> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = >> NoNode for /brokers/topics/warehouse_prices/partitions >> at >> storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:81) >> ~[storm-kafka-0.9.4.jar:0.9.4] >> at storm.kafka.trident.ZkBrokerReader.<init>(ZkBrokerReader.java:42) >> ~[storm-kafka-0.9.4.jar:0.9.4] >> at storm.kafka.KafkaUtils.makeBrokerReader(KafkaUtils.java:57) >> ~[storm-kafka-0.9.4.jar:0.9.4] >> at storm.kafka.KafkaSpout.open(KafkaSpout.java:87) >> ~[storm-kafka-0.9.4.jar:0.9.4] >> at >> backtype.storm.daemon.executor$fn__3371$fn__3386.invoke(executor.clj:522) >> ~[storm-core-0.9.4.jar:0.9.4] >> at backtype.storm.util$async_loop$fn__460.invoke(util.clj:461) >> ~[storm-core-0.9.4.jar:0.9.4] >> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] >> at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51] >> >> Should this not be discovered by the Spout on startup? >> > >