Thanks for all your help. Why do you have to use the localhost zookeeper? How
do you specify the kafka broker?
> On 21 May 2015, at 15:37, Cristian Makoto Sandiga <cmsand...@gmail.com> wrote:
>
> Again, when is eclipse local topology you have to connect to localhost:2000
> (in the embedded stom zookeeper)
>
> 2015-05-21 11:28 GMT-03:00 Cristian Makoto Sandiga <cmsand...@gmail.com
> <mailto:cmsand...@gmail.com>>:
> I dont know, i'm using BaseRichBolt.
>
> 2015-05-21 11:25 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com
> <mailto:cuthbert....@gmail.com>>:
>
> I am using BaseBasicBolt as that was told ack events automatically. Should we
> not use that?
>
>
>> On 21 May 2015, at 15:24, Cristian Makoto Sandiga <cmsand...@gmail.com
>> <mailto:cmsand...@gmail.com>> wrote:
>>
>> Are you acking in your bolt? collector.ack(input);
>>
>> 2015-05-21 11:19 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com
>> <mailto:cuthbert....@gmail.com>>:
>> Okay so interesting:
>>
>> I have your configuration in my local eclipse and a remote zookeeper that
>> kafka is connected to:
>>
>> ZkHosts zkhost = new ZkHosts(“1.1.1.1:2181
>> <http://1.1.1.1:2181/>","/brokers"); // /brokers -> kafka broker
>> SpoutConfig spoutconfig = new SpoutConfig(“1.1.1.1:2181
>> <http://1.1.1.1:2181/>", “/prices", "/brokers", “rawWarehousePriceSpout");
>>
>> Zookeeper:
>>
>> [zk: 127.0.0.1:2181(CONNECTED) 80] ls /brokers
>> [rawWarehousePriceSpout, topics, ids]
>> [zk: 127.0.0.1:2181(CONNECTED) 81] ls /brokers/rawWarehousePriceSpout
>> []
>> [zk: 127.0.0.1:2181(CONNECTED) 82]
>>
>>
>> Logs i see:
>>
>> 41994 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.KafkaUtils - Task
>> [1/1] assigned
>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>> partition=0},
>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>> partition=1},
>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>> partition=2}]
>> 41994 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator -
>> Task [1/1] Deleted partition managers: []
>> 41994 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator -
>> Task [1/1] New partition managers:
>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>> partition=0},
>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>> partition=1},
>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>> partition=2}]
>> 42094 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager
>> - Read partition information from:
>> /brokers/rawWarehousePriceSpout/partition_0 --> null
>> 42363 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager
>> - No partition information found, using configuration to determine offset
>> 42363 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager
>> - Starting Kafka price-engine-demo-server.c.celertech-01.internal:0 from
>> offset 425530
>> 42366 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager
>> - Read partition information from:
>> /brokers/rawWarehousePriceSpout/partition_1 --> null
>> 42400 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager
>> - No partition information found, using configuration to determine offset
>> 42400 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager
>> - Starting Kafka price-engine-demo-server.c.celertech-01.internal:1 from
>> offset 550715
>> 42401 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager
>> - Read partition information from:
>> /brokers/rawWarehousePriceSpout/partition_2 --> null
>> 42432 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager
>> - No partition information found, using configuration to determine offset
>> 42432 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager
>> - Starting Kafka price-engine-demo-server.c.celertech-01.internal:2 from
>> offset 439547
>> 42432 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator -
>> Task [1/1] Finished refreshing
>> 44652 [ProcessThread(sid:0 cport:-1):] INFO
>> org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level
>> KeeperException when processing sessionid:0x14d76d4418d000c type:create
>> cxid:0x5 zxid:0x2c txntype:-1 reqpath:n/a Error
>> Path:/brokers/rawWarehousePriceSpout Error:KeeperErrorCode = NoNode for
>> /brokers/rawWarehousePriceSpout
>>
>>
>>
>>> On 21 May 2015, at 15:02, Cristian Makoto Sandiga <cmsand...@gmail.com
>>> <mailto:cmsand...@gmail.com>> wrote:
>>>
>>> Im sorry, i made a mistake
>>>
>>> ZkHosts zkhost = new ZkHosts("localhost:2181","/brokers"); //
>>> /brokers -> kafka broker
>>> SpoutConfig spoutconfig = new SpoutConfig(zkhost,
>>> "bid_history", "/brokers", "kafka-spout"); --> storm --> zookeeper
>>>
>>> If you see not is a ERROR, is a INFO usually when directory structure not
>>> exist.
>>>
>>> Take a look Zookeeper Kaka-broker (port 2181)
>>> ----------------------------------
>>>
>>> -- bin/zookeeper-shell.sh localhost:2181
>>>
>>> ls /brokers/topics/bid_history/partitions/0/state
>>> []
>>>
>>> get /brokers/topics/bid_history/partitions/0/state
>>> {"controller_epoch":1,"leader":0,"version":1,"leader_epoch":0,"isr":[0]}
>>> cZxid = 0x113
>>> ctime = Tue May 19 18:19:24 BRT 2015
>>> mZxid = 0x113
>>> mtime = Tue May 19 18:19:24 BRT 2015
>>> pZxid = 0x113
>>> cversion = 0
>>> dataVersion = 0
>>> aclVersion = 0
>>> ephemeralOwner = 0x0
>>> dataLength = 72
>>> numChildren = 0
>>>
>>>
>>> Take a look Zookeeper Storm-broker (port 2000)
>>> ----------------------------------
>>> Start topology, send a message and then look:
>>>
>>> bin/zookeeper-shell.sh localhost:2000
>>> Connecting to localhost:2000
>>> Welcome to ZooKeeper!
>>> JLine support is disabled
>>>
>>> ls /
>>> [storm, brokers, zookeeper]
>>> ls /brokers
>>> [kafka-spout]
>>> ls /brokers/kafka-spout
>>> [partition_0]
>>> ls /brokers/kafka-spout/partition_0
>>> []
>>> get /brokers/kafka-spout/partition_0
>>> {"topology":{"id":"a9be1962-6b4e-4ed4-ae68-155a1948a1f6","name":"consolidate_reports"},"offset":4426029,"partition":0,"broker":{"host":"localhost","port":9092},"topic":"bid_history"}
>>> cZxid = 0x50
>>> ctime = Thu May 21 11:00:48 BRT 2015
>>> mZxid = 0x50
>>> mtime = Thu May 21 11:00:48 BRT 2015
>>> pZxid = 0x50
>>> cversion = 0
>>> dataVersion = 0
>>> aclVersion = 0
>>> ephemeralOwner = 0x0
>>> dataLength = 182
>>> numChildren = 0
>>>
>>>
>>> 2015-05-21 10:43 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com
>>> <mailto:cuthbert....@gmail.com>>:
>>> Bec I do see this error in the storm logs
>>>
>>> 5304 [ProcessThread(sid:0 cport:-1):] INFO
>>> org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level
>>> KeeperException when processing sessionid:0x14d76b1ea54000c type:create
>>> cxid:0x5 zxid:0x2c txntype:-1 reqpath:n/a Error
>>> Path:/brokers/rawWarehousePriceSpout Error:KeeperErrorCode = NoNode for
>>> /brokers/rawWarehousePriceSpout
>>>
>>>
>>>
>>>> On 21 May 2015, at 14:04, Cristian Makoto Sandiga <cmsand...@gmail.com
>>>> <mailto:cmsand...@gmail.com>> wrote:
>>>>
>>>> Are you wrong
>>>>
>>>> zookeeperPath , // the root path in Zookeeper for the spout to store the
>>>> consumer offsets
>>>>
>>>> Is for you KafkaSpout find information about broker meta information (
>>>> topics, partitions)
>>>>
>>>> You have to override
>>>>
>>>> SpoutConfig spoutconfig = new SpoutConfig(zkhost, "bid_history",
>>>> "/brokers", "kafka-spout");
>>>> spoutconfig.zkServers
>>>> spoutconfig.zkPort
>>>>
>>>> Take a look a KafkaSpout.class and do a debug mode in this part.
>>>>
>>>> 73 Map stateConf = new HashMap(conf);
>>>> 74 List<String> zkServers = _spoutConfig.zkServers;
>>>> if (zkServers == null) {
>>>> zkServers = (List<String>)
>>>> conf.get(Config.STORM_ZOOKEEPER_SERVERS);
>>>> }
>>>> Integer zkPort = _spoutConfig.zkPort;
>>>> if (zkPort == null) {
>>>> zkPort = ((Number)
>>>> conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
>>>> }
>>>>
>>>>
>>>>
>>>>
>>>> 2015-05-21 9:53 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com
>>>> <mailto:cuthbert....@gmail.com>>:
>>>> So when I run it out of eclipse in local mode I am pointing to the same
>>>> zookeeper as I am when it is deployed. Whatever way it still does not
>>>> write the offset.
>>>>
>>>> Only different I have is my spout is configured like so:
>>>>
>>>> BrokerHosts hosts = new ZkHosts(zookeeperQuorum);
>>>>
>>>> String zookeeperPath = KAFKA_STORM_DIR + "/" + topic;
>>>> SpoutConfig spoutConfig = new SpoutConfig(
>>>> hosts,
>>>> topic, // topic to read from
>>>> zookeeperPath , // the root path in Zookeeper
>>>> for the spout to store the consumer offsets
>>>> spoutId); // an id for this consumer for
>>>> storing the consumer offsets in Zookeeper
>>>>
>>>> //Check if we should be consuming messages from the beginning
>>>> spoutConfig.forceFromStart = consumeFromBeginning;
>>>> spoutConfig.maxOffsetBehind = Long.MAX_VALUE;
>>>> spoutConfig.useStartOffsetTimeIfOffsetOutOfRange = true;
>>>> spoutConfig.scheme = new SchemeAsMultiScheme(new
>>>> KafkaAnalyticsMessageDecoder());
>>>>
>>>>
>>>>> On 21 May 2015, at 13:50, Cristian Makoto Sandiga <cmsand...@gmail.com
>>>>> <mailto:cmsand...@gmail.com>> wrote:
>>>>>
>>>>> I have the same problem when i have Storm in Local cluster mode. but is
>>>>> because, Local mode has a zookeeper embedded and every time that restart
>>>>> the offset is null and is the reason that you have
>>>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_2 --> null.
>>>>>
>>>>> Try to run Local Mode, send some messages to spout, and then see Offset
>>>>> in Zookeeper (port 2000).
>>>>>
>>>>> Or use Zookeeper of kafka, override.
>>>>>
>>>>> spoutconfig.zkServers
>>>>> spoutconfig.zkPort
>>>>>
>>>>>
>>>>> 2015-05-21 9:39 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com
>>>>> <mailto:cuthbert....@gmail.com>>:
>>>>> Thanks,
>>>>>
>>>>> Yup kafka is creating them on startup when the topics gets its first msg.
>>>>> And from below the storm logs are never really committing the offset to
>>>>> zookeeper.
>>>>>
>>>>> Zookeeper kafka topics details
>>>>>
>>>>> [zk: 127.0.0.1:2181(CONNECTED) 41] ls
>>>>> /brokers/topics/warehouse_prices/partitions
>>>>> [2, 1, 0]
>>>>> [zk: 127.0.0.1:2181(CONNECTED) 42]
>>>>>
>>>>> Zookeeper Storm
>>>>>
>>>>> [zk: 127.0.0.1:2181(CONNECTED) 42] ls
>>>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout
>>>>> []
>>>>> [zk: 127.0.0.1:2181(CONNECTED) 43]
>>>>>
>>>>>
>>>>> Storm logs
>>>>>
>>>>> 44325 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator
>>>>> - Task [1/1] New partition managers:
>>>>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>>>>> partition=0},
>>>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>>>>> partition=1},
>>>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>>>>> partition=2}]
>>>>> 44491 [Thread-15-rawWarehousePriceSpout] INFO
>>>>> storm.kafka.PartitionManager - Read partition information from:
>>>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_0 --> null
>>>>> 44746 [Thread-15-rawWarehousePriceSpout] INFO
>>>>> storm.kafka.PartitionManager - No partition information found, using
>>>>> configuration to determine offset
>>>>> 44746 [Thread-15-rawWarehousePriceSpout] INFO
>>>>> storm.kafka.PartitionManager - Last commit offset from zookeeper: 0
>>>>> 44747 [Thread-15-rawWarehousePriceSpout] INFO
>>>>> storm.kafka.PartitionManager - Commit offset 0 is more than
>>>>> 9223372036854775807 behind, resetting to startOffsetTime=-2
>>>>> 44747 [Thread-15-rawWarehousePriceSpout] INFO
>>>>> storm.kafka.PartitionManager - Starting Kafka
>>>>> price-engine-demo-server.c.celertech-01.internal:0 from offset 0
>>>>> 44749 [Thread-15-rawWarehousePriceSpout] INFO
>>>>> storm.kafka.PartitionManager - Read partition information from:
>>>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_1 --> null
>>>>> 44778 [Thread-15-rawWarehousePriceSpout] INFO
>>>>> storm.kafka.PartitionManager - No partition information found, using
>>>>> configuration to determine offset
>>>>> 44778 [Thread-15-rawWarehousePriceSpout] INFO
>>>>> storm.kafka.PartitionManager - Last commit offset from zookeeper: 0
>>>>> 44778 [Thread-15-rawWarehousePriceSpout] INFO
>>>>> storm.kafka.PartitionManager - Commit offset 0 is more than
>>>>> 9223372036854775807 behind, resetting to startOffsetTime=-2
>>>>> 44779 [Thread-15-rawWarehousePriceSpout] INFO
>>>>> storm.kafka.PartitionManager - Starting Kafka
>>>>> price-engine-demo-server.c.celertech-01.internal:1 from offset 0
>>>>> 44781 [Thread-15-rawWarehousePriceSpout] INFO
>>>>> storm.kafka.PartitionManager - Read partition information from:
>>>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_2 --> null
>>>>> 44809 [Thread-15-rawWarehousePriceSpout] INFO
>>>>> storm.kafka.PartitionManager - No partition information found, using
>>>>> configuration to determine offset
>>>>> 44809 [Thread-15-rawWarehousePriceSpout] INFO
>>>>> storm.kafka.PartitionManager - Last commit offset from zookeeper: 0
>>>>> 44810 [Thread-15-rawWarehousePriceSpout] INFO
>>>>> storm.kafka.PartitionManager - Commit offset 0 is more than
>>>>> 9223372036854775807 behind, resetting to startOffsetTime=-2
>>>>> 44810 [Thread-15-rawWarehousePriceSpout] INFO
>>>>> storm.kafka.PartitionManager - Starting Kafka
>>>>> price-engine-demo-server.c.celertech-01.internal:2 from offset 0
>>>>> 44810 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator
>>>>> - Task [1/1] Finished refreshing
>>>>> 47438 [ProcessThread(sid:0 cport:-1):] INFO
>>>>> org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level
>>>>> KeeperException when processing sessionid:0x14d7658db3c000c type:create
>>>>> cxid:0x5 zxid:0x2c txntype:-1 reqpath:n/a Error
>>>>> Path:/kafkastorm/warehouse_prices/rawWarehousePriceSpout
>>>>> Error:KeeperErrorCode = NoNode for
>>>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout
>>>>> 104184 [Thread-15-rawWarehousePriceSpout] WARN storm.kafka.KafkaUtils -
>>>>> No data found in Kafka Partition partition_0
>>>>> 104828 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator
>>>>> - Task [1/1] Refreshing partition manager connections
>>>>> 105005 [Thread-15-rawWarehousePriceSpout] INFO
>>>>> storm.kafka.DynamicBrokersReader - Read partition info from zookeeper:
>>>>> GlobalPartitionInformation{partitionMap={0=price-engine-demo-server.c.celertech-01.internal:9092,
>>>>> 1=price-engine-demo-server.c.celertech-01.internal:9092,
>>>>> 2=price-engine-demo-server.c.celertech-01.internal:9092}}
>>>>> 105005 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.KafkaUtils -
>>>>> Task [1/1] assigned
>>>>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>>>>> partition=0},
>>>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>>>>> partition=1},
>>>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>>>>> partition=2}]
>>>>> 105006 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator
>>>>> - Task [1/1] Deleted partition managers: []
>>>>> 105006 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator
>>>>> - Task [1/1] New partition managers: []
>>>>> 105006 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator
>>>>> - Task [1/1] Finished refreshing
>>>>> 164204 [Thread-15-rawWarehousePriceSpout] WARN storm.kafka.KafkaUtils -
>>>>> No data found in Kafka Partition partition_0
>>>>> 165063 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator
>>>>> - Task [1/1] Refreshing partition manager connections
>>>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO
>>>>> storm.kafka.DynamicBrokersReader - Read partition info from zookeeper:
>>>>> GlobalPartitionInformation{partitionMap={0=price-engine-demo-server.c.celertech-01.internal:9092,
>>>>> 1=price-engine-demo-server.c.celertech-01.internal:9092,
>>>>> 2=price-engine-demo-server.c.celertech-01.internal:9092}}
>>>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.KafkaUtils -
>>>>> Task [1/1] assigned
>>>>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>>>>> partition=0},
>>>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>>>>> partition=1},
>>>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>>>>> partition=2}]
>>>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator
>>>>> - Task [1/1] Deleted partition managers: []
>>>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator
>>>>> - Task [1/1] New partition managers: []
>>>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator
>>>>> - Task [1/1] Finished refreshing
>>>>> 224233 [Thread-15-rawWarehousePriceSpout] WARN storm.kafka.KafkaUtils -
>>>>> No data found in Kafka Partition partition_0
>>>>> 225308 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator
>>>>> - Task [1/1] Refreshing partition manager connections
>>>>>
>>>>>
>>>>>> On 21 May 2015, at 13:28, Cristian Makoto Sandiga <cmsand...@gmail.com
>>>>>> <mailto:cmsand...@gmail.com>> wrote:
>>>>>>
>>>>>> Zookeeper create nothing when startup, you have to create your
>>>>>> partitions in kafka broker.
>>>>>>
>>>>>> bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic
>>>>>> click_history --replication-factor 1 --partitions 10
>>>>>>
>>>>>>
>>>>>>
>>>>>> 2015-05-21 8:58 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com
>>>>>> <mailto:cuthbert....@gmail.com>>:
>>>>>> All,
>>>>>>
>>>>>> We changed or paths in zookeeper and we are now seeing
>>>>>>
>>>>>> java.lang.RuntimeException: java.lang.RuntimeException:
>>>>>> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode =
>>>>>> NoNode for /brokers/topics/warehouse_prices/partitions
>>>>>> at
>>>>>> storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:81)
>>>>>> ~[storm-kafka-0.9.4.jar:0.9.4]
>>>>>> at storm.kafka.trident.ZkBrokerReader.<init>(ZkBrokerReader.java:42)
>>>>>> ~[storm-kafka-0.9.4.jar:0.9.4]
>>>>>> at storm.kafka.KafkaUtils.makeBrokerReader(KafkaUtils.java:57)
>>>>>> ~[storm-kafka-0.9.4.jar:0.9.4]
>>>>>> at storm.kafka.KafkaSpout.open(KafkaSpout.java:87)
>>>>>> ~[storm-kafka-0.9.4.jar:0.9.4]
>>>>>> at
>>>>>> backtype.storm.daemon.executor$fn__3371$fn__3386.invoke(executor.clj:522)
>>>>>> ~[storm-core-0.9.4.jar:0.9.4]
>>>>>> at backtype.storm.util$async_loop$fn__460.invoke(util.clj:461)
>>>>>> ~[storm-core-0.9.4.jar:0.9.4]
>>>>>> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>>>>>> at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]
>>>>>>
>>>>>> Should this not be discovered by the Spout on startup?
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>>
>>
>>
>
>
>