Bec I do see this error in the storm logs

5304 [ProcessThread(sid:0 cport:-1):] INFO  
org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level 
KeeperException when processing sessionid:0x14d76b1ea54000c type:create 
cxid:0x5 zxid:0x2c txntype:-1 reqpath:n/a Error 
Path:/brokers/rawWarehousePriceSpout Error:KeeperErrorCode = NoNode for 
/brokers/rawWarehousePriceSpout



> On 21 May 2015, at 14:04, Cristian Makoto Sandiga <cmsand...@gmail.com> wrote:
> 
> Are you wrong 
> 
> zookeeperPath , // the root path in Zookeeper for the spout to store the 
> consumer offsets
> 
> Is for you KafkaSpout find information about broker meta information ( 
> topics, partitions)
> 
> You have to override 
> 
> SpoutConfig spoutconfig = new SpoutConfig(zkhost, "bid_history", "/brokers", 
> "kafka-spout");
> spoutconfig.zkServers
> spoutconfig.zkPort 
> 
> Take a look a KafkaSpout.class and do a debug mode in this part.
> 
> 73        Map stateConf = new HashMap(conf);
> 74       List<String> zkServers = _spoutConfig.zkServers;
>         if (zkServers == null) {
>             zkServers = (List<String>) 
> conf.get(Config.STORM_ZOOKEEPER_SERVERS);
>         }
>         Integer zkPort = _spoutConfig.zkPort;
>         if (zkPort == null) {
>             zkPort = ((Number) 
> conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
>         }
> 
> 
> 
> 
> 2015-05-21 9:53 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com 
> <mailto:cuthbert....@gmail.com>>:
> So when I run it out of eclipse in local mode I am pointing to the same 
> zookeeper as I am when it is deployed. Whatever way it still does not write 
> the offset.
> 
> Only different I have is my spout is configured like so:
> 
> BrokerHosts hosts = new ZkHosts(zookeeperQuorum);
>               
>               String zookeeperPath = KAFKA_STORM_DIR + "/" + topic;
>               SpoutConfig spoutConfig = new SpoutConfig(
>                               hosts,
>                               topic, // topic to read from
>                               zookeeperPath , // the root path in Zookeeper 
> for the spout to store the consumer offsets
>                               spoutId); // an id for this consumer for 
> storing the consumer offsets in Zookeeper
>         
>               //Check if we should be consuming messages from the beginning
>               spoutConfig.forceFromStart = consumeFromBeginning;
>               spoutConfig.maxOffsetBehind = Long.MAX_VALUE;
>               spoutConfig.useStartOffsetTimeIfOffsetOutOfRange = true;
>               spoutConfig.scheme = new SchemeAsMultiScheme(new 
> KafkaAnalyticsMessageDecoder());
> 
> 
>> On 21 May 2015, at 13:50, Cristian Makoto Sandiga <cmsand...@gmail.com 
>> <mailto:cmsand...@gmail.com>> wrote:
>> 
>> I have the same problem when i have Storm in Local cluster mode. but is 
>> because, Local mode has a zookeeper embedded and every time that restart  
>> the offset is null  and is the reason that you have  
>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_2  --> null.
>> 
>> Try to run Local Mode, send some messages to spout, and then see Offset in 
>> Zookeeper (port 2000).
>> 
>> Or use Zookeeper of kafka, override.
>> 
>> spoutconfig.zkServers
>> spoutconfig.zkPort
>> 
>> 
>> 2015-05-21 9:39 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com 
>> <mailto:cuthbert....@gmail.com>>:
>> Thanks,
>> 
>> Yup kafka is creating them on startup when the topics gets its first msg. 
>> And from below the storm logs are never really committing the offset to 
>> zookeeper. 
>> 
>> Zookeeper kafka topics details
>> 
>> [zk: 127.0.0.1:2181(CONNECTED) 41] ls 
>> /brokers/topics/warehouse_prices/partitions
>> [2, 1, 0]
>> [zk: 127.0.0.1:2181(CONNECTED) 42]
>> 
>> Zookeeper Storm
>> 
>> [zk: 127.0.0.1:2181(CONNECTED) 42] ls 
>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout
>> []
>> [zk: 127.0.0.1:2181(CONNECTED) 43]
>> 
>> 
>> Storm logs
>> 
>> 44325 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator - 
>> Task [1/1] New partition managers: 
>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
>> partition=0}, 
>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
>> partition=1}, 
>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
>> partition=2}]
>> 44491 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager 
>> - Read partition information from: 
>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_0  --> null
>> 44746 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager 
>> - No partition information found, using configuration to determine offset
>> 44746 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager 
>> - Last commit offset from zookeeper: 0
>> 44747 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager 
>> - Commit offset 0 is more than 9223372036854775807 behind, resetting to 
>> startOffsetTime=-2
>> 44747 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager 
>> - Starting Kafka price-engine-demo-server.c.celertech-01.internal:0 from 
>> offset 0
>> 44749 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager 
>> - Read partition information from: 
>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_1  --> null
>> 44778 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager 
>> - No partition information found, using configuration to determine offset
>> 44778 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager 
>> - Last commit offset from zookeeper: 0
>> 44778 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager 
>> - Commit offset 0 is more than 9223372036854775807 behind, resetting to 
>> startOffsetTime=-2
>> 44779 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager 
>> - Starting Kafka price-engine-demo-server.c.celertech-01.internal:1 from 
>> offset 0
>> 44781 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager 
>> - Read partition information from: 
>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_2  --> null
>> 44809 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager 
>> - No partition information found, using configuration to determine offset
>> 44809 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager 
>> - Last commit offset from zookeeper: 0
>> 44810 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager 
>> - Commit offset 0 is more than 9223372036854775807 behind, resetting to 
>> startOffsetTime=-2
>> 44810 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager 
>> - Starting Kafka price-engine-demo-server.c.celertech-01.internal:2 from 
>> offset 0
>> 44810 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator - 
>> Task [1/1] Finished refreshing
>> 47438 [ProcessThread(sid:0 cport:-1):] INFO  
>> org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level 
>> KeeperException when processing sessionid:0x14d7658db3c000c type:create 
>> cxid:0x5 zxid:0x2c txntype:-1 reqpath:n/a Error 
>> Path:/kafkastorm/warehouse_prices/rawWarehousePriceSpout 
>> Error:KeeperErrorCode = NoNode for 
>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout
>> 104184 [Thread-15-rawWarehousePriceSpout] WARN  storm.kafka.KafkaUtils - No 
>> data found in Kafka Partition partition_0
>> 104828 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator - 
>> Task [1/1] Refreshing partition manager connections
>> 105005 [Thread-15-rawWarehousePriceSpout] INFO  
>> storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: 
>> GlobalPartitionInformation{partitionMap={0=price-engine-demo-server.c.celertech-01.internal:9092,
>>  1=price-engine-demo-server.c.celertech-01.internal:9092, 
>> 2=price-engine-demo-server.c.celertech-01.internal:9092}}
>> 105005 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.KafkaUtils - 
>> Task [1/1] assigned 
>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
>> partition=0}, 
>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
>> partition=1}, 
>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
>> partition=2}]
>> 105006 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator - 
>> Task [1/1] Deleted partition managers: []
>> 105006 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator - 
>> Task [1/1] New partition managers: []
>> 105006 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator - 
>> Task [1/1] Finished refreshing
>> 164204 [Thread-15-rawWarehousePriceSpout] WARN  storm.kafka.KafkaUtils - No 
>> data found in Kafka Partition partition_0
>> 165063 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator - 
>> Task [1/1] Refreshing partition manager connections
>> 165240 [Thread-15-rawWarehousePriceSpout] INFO  
>> storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: 
>> GlobalPartitionInformation{partitionMap={0=price-engine-demo-server.c.celertech-01.internal:9092,
>>  1=price-engine-demo-server.c.celertech-01.internal:9092, 
>> 2=price-engine-demo-server.c.celertech-01.internal:9092}}
>> 165240 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.KafkaUtils - 
>> Task [1/1] assigned 
>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
>> partition=0}, 
>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
>> partition=1}, 
>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
>> partition=2}]
>> 165240 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator - 
>> Task [1/1] Deleted partition managers: []
>> 165240 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator - 
>> Task [1/1] New partition managers: []
>> 165240 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator - 
>> Task [1/1] Finished refreshing
>> 224233 [Thread-15-rawWarehousePriceSpout] WARN  storm.kafka.KafkaUtils - No 
>> data found in Kafka Partition partition_0
>> 225308 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator - 
>> Task [1/1] Refreshing partition manager connections
>> 
>> 
>>> On 21 May 2015, at 13:28, Cristian Makoto Sandiga <cmsand...@gmail.com 
>>> <mailto:cmsand...@gmail.com>> wrote:
>>> 
>>> Zookeeper create nothing when startup, you have to create your partitions 
>>> in kafka broker. 
>>> 
>>> bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic 
>>> click_history --replication-factor 1 --partitions 10
>>> 
>>> 
>>> 
>>> 2015-05-21 8:58 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com 
>>> <mailto:cuthbert....@gmail.com>>:
>>> All,
>>> 
>>> We changed or paths in zookeeper and we are now seeing
>>> 
>>> java.lang.RuntimeException: java.lang.RuntimeException: 
>>> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = 
>>> NoNode for /brokers/topics/warehouse_prices/partitions
>>>     at 
>>> storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:81)
>>>  ~[storm-kafka-0.9.4.jar:0.9.4]
>>>     at storm.kafka.trident.ZkBrokerReader.<init>(ZkBrokerReader.java:42) 
>>> ~[storm-kafka-0.9.4.jar:0.9.4]
>>>     at storm.kafka.KafkaUtils.makeBrokerReader(KafkaUtils.java:57) 
>>> ~[storm-kafka-0.9.4.jar:0.9.4]
>>>     at storm.kafka.KafkaSpout.open(KafkaSpout.java:87) 
>>> ~[storm-kafka-0.9.4.jar:0.9.4]
>>>     at 
>>> backtype.storm.daemon.executor$fn__3371$fn__3386.invoke(executor.clj:522) 
>>> ~[storm-core-0.9.4.jar:0.9.4]
>>>     at backtype.storm.util$async_loop$fn__460.invoke(util.clj:461) 
>>> ~[storm-core-0.9.4.jar:0.9.4]
>>>     at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>>>     at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]
>>> 
>>> Should this not be discovered by the Spout on startup? 
>>> 
>> 
>> 
> 
> 

Reply via email to