So when I run it out of eclipse in local mode I am pointing to the same 
zookeeper as I am when it is deployed. Whatever way it still does not write the 
offset.

Only different I have is my spout is configured like so:

BrokerHosts hosts = new ZkHosts(zookeeperQuorum);
                
                String zookeeperPath = KAFKA_STORM_DIR + "/" + topic;
                SpoutConfig spoutConfig = new SpoutConfig(
                                hosts,
                                topic, // topic to read from
                                zookeeperPath , // the root path in Zookeeper 
for the spout to store the consumer offsets
                                spoutId); // an id for this consumer for 
storing the consumer offsets in Zookeeper
        
                //Check if we should be consuming messages from the beginning
                spoutConfig.forceFromStart = consumeFromBeginning;
                spoutConfig.maxOffsetBehind = Long.MAX_VALUE;
                spoutConfig.useStartOffsetTimeIfOffsetOutOfRange = true;
                spoutConfig.scheme = new SchemeAsMultiScheme(new 
KafkaAnalyticsMessageDecoder());


> On 21 May 2015, at 13:50, Cristian Makoto Sandiga <cmsand...@gmail.com> wrote:
> 
> I have the same problem when i have Storm in Local cluster mode. but is 
> because, Local mode has a zookeeper embedded and every time that restart  the 
> offset is null  and is the reason that you have  
> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_2  --> null.
> 
> Try to run Local Mode, send some messages to spout, and then see Offset in 
> Zookeeper (port 2000).
> 
> Or use Zookeeper of kafka, override.
> 
> spoutconfig.zkServers
> spoutconfig.zkPort
> 
> 
> 2015-05-21 9:39 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com 
> <mailto:cuthbert....@gmail.com>>:
> Thanks,
> 
> Yup kafka is creating them on startup when the topics gets its first msg. And 
> from below the storm logs are never really committing the offset to 
> zookeeper. 
> 
> Zookeeper kafka topics details
> 
> [zk: 127.0.0.1:2181(CONNECTED) 41] ls 
> /brokers/topics/warehouse_prices/partitions
> [2, 1, 0]
> [zk: 127.0.0.1:2181(CONNECTED) 42]
> 
> Zookeeper Storm
> 
> [zk: 127.0.0.1:2181(CONNECTED) 42] ls 
> /kafkastorm/warehouse_prices/rawWarehousePriceSpout
> []
> [zk: 127.0.0.1:2181(CONNECTED) 43]
> 
> 
> Storm logs
> 
> 44325 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator - 
> Task [1/1] New partition managers: 
> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
> partition=0}, 
> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
> partition=1}, 
> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
> partition=2}]
> 44491 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager - 
> Read partition information from: 
> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_0  --> null
> 44746 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager - 
> No partition information found, using configuration to determine offset
> 44746 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager - 
> Last commit offset from zookeeper: 0
> 44747 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager - 
> Commit offset 0 is more than 9223372036854775807 behind, resetting to 
> startOffsetTime=-2
> 44747 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager - 
> Starting Kafka price-engine-demo-server.c.celertech-01.internal:0 from offset > 0
> 44749 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager - 
> Read partition information from: 
> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_1  --> null
> 44778 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager - 
> No partition information found, using configuration to determine offset
> 44778 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager - 
> Last commit offset from zookeeper: 0
> 44778 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager - 
> Commit offset 0 is more than 9223372036854775807 behind, resetting to 
> startOffsetTime=-2
> 44779 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager - 
> Starting Kafka price-engine-demo-server.c.celertech-01.internal:1 from offset > 0
> 44781 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager - 
> Read partition information from: 
> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_2  --> null
> 44809 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager - 
> No partition information found, using configuration to determine offset
> 44809 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager - 
> Last commit offset from zookeeper: 0
> 44810 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager - 
> Commit offset 0 is more than 9223372036854775807 behind, resetting to 
> startOffsetTime=-2
> 44810 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager - 
> Starting Kafka price-engine-demo-server.c.celertech-01.internal:2 from offset > 0
> 44810 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator - 
> Task [1/1] Finished refreshing
> 47438 [ProcessThread(sid:0 cport:-1):] INFO  
> org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level 
> KeeperException when processing sessionid:0x14d7658db3c000c type:create 
> cxid:0x5 zxid:0x2c txntype:-1 reqpath:n/a Error 
> Path:/kafkastorm/warehouse_prices/rawWarehousePriceSpout 
> Error:KeeperErrorCode = NoNode for 
> /kafkastorm/warehouse_prices/rawWarehousePriceSpout
> 104184 [Thread-15-rawWarehousePriceSpout] WARN  storm.kafka.KafkaUtils - No 
> data found in Kafka Partition partition_0
> 104828 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator - 
> Task [1/1] Refreshing partition manager connections
> 105005 [Thread-15-rawWarehousePriceSpout] INFO  
> storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: 
> GlobalPartitionInformation{partitionMap={0=price-engine-demo-server.c.celertech-01.internal:9092,
>  1=price-engine-demo-server.c.celertech-01.internal:9092, 
> 2=price-engine-demo-server.c.celertech-01.internal:9092}}
> 105005 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.KafkaUtils - Task 
> [1/1] assigned 
> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
> partition=0}, 
> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
> partition=1}, 
> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
> partition=2}]
> 105006 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator - 
> Task [1/1] Deleted partition managers: []
> 105006 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator - 
> Task [1/1] New partition managers: []
> 105006 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator - 
> Task [1/1] Finished refreshing
> 164204 [Thread-15-rawWarehousePriceSpout] WARN  storm.kafka.KafkaUtils - No 
> data found in Kafka Partition partition_0
> 165063 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator - 
> Task [1/1] Refreshing partition manager connections
> 165240 [Thread-15-rawWarehousePriceSpout] INFO  
> storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: 
> GlobalPartitionInformation{partitionMap={0=price-engine-demo-server.c.celertech-01.internal:9092,
>  1=price-engine-demo-server.c.celertech-01.internal:9092, 
> 2=price-engine-demo-server.c.celertech-01.internal:9092}}
> 165240 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.KafkaUtils - Task 
> [1/1] assigned 
> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
> partition=0}, 
> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
> partition=1}, 
> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
> partition=2}]
> 165240 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator - 
> Task [1/1] Deleted partition managers: []
> 165240 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator - 
> Task [1/1] New partition managers: []
> 165240 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator - 
> Task [1/1] Finished refreshing
> 224233 [Thread-15-rawWarehousePriceSpout] WARN  storm.kafka.KafkaUtils - No 
> data found in Kafka Partition partition_0
> 225308 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator - 
> Task [1/1] Refreshing partition manager connections
> 
> 
>> On 21 May 2015, at 13:28, Cristian Makoto Sandiga <cmsand...@gmail.com 
>> <mailto:cmsand...@gmail.com>> wrote:
>> 
>> Zookeeper create nothing when startup, you have to create your partitions in 
>> kafka broker. 
>> 
>> bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic 
>> click_history --replication-factor 1 --partitions 10
>> 
>> 
>> 
>> 2015-05-21 8:58 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com 
>> <mailto:cuthbert....@gmail.com>>:
>> All,
>> 
>> We changed or paths in zookeeper and we are now seeing
>> 
>> java.lang.RuntimeException: java.lang.RuntimeException: 
>> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = 
>> NoNode for /brokers/topics/warehouse_prices/partitions
>>      at 
>> storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:81) 
>> ~[storm-kafka-0.9.4.jar:0.9.4]
>>      at storm.kafka.trident.ZkBrokerReader.<init>(ZkBrokerReader.java:42) 
>> ~[storm-kafka-0.9.4.jar:0.9.4]
>>      at storm.kafka.KafkaUtils.makeBrokerReader(KafkaUtils.java:57) 
>> ~[storm-kafka-0.9.4.jar:0.9.4]
>>      at storm.kafka.KafkaSpout.open(KafkaSpout.java:87) 
>> ~[storm-kafka-0.9.4.jar:0.9.4]
>>      at 
>> backtype.storm.daemon.executor$fn__3371$fn__3386.invoke(executor.clj:522) 
>> ~[storm-core-0.9.4.jar:0.9.4]
>>      at backtype.storm.util$async_loop$fn__460.invoke(util.clj:461) 
>> ~[storm-core-0.9.4.jar:0.9.4]
>>      at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>>      at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]
>> 
>> Should this not be discovered by the Spout on startup? 
>> 
> 
> 

Reply via email to