I am only creating one partition in code here:
 GlobalPartitionInformation hostsAndPartitions = new 
GlobalPartitionInformation(); hostsAndPartitions.addPartition(0, new 
Broker("127.0.0.1", 9092)); BrokerHosts brokerHosts = new 
StaticHosts(hostsAndPartitions);
I hope that answered your question. I am new to both Storm and Kafka so i am 
not sure exactly how it works. 
If i am understanding you correctly, the line you told me to add in the first 
email should work because i am only creating one partition?
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
Thanks again for the help :-)
David
Date: Wed, 30 Mar 2016 15:36:19 +0530
Subject: Re: Storm KafkaSpout Integration
From: dkira...@aadhya-analytics.com
To: user@storm.apache.org


Hi david,

Can I know how many  partitions you are having?
statement I have given to you is default.if you are  running with no of  
partitions make sure you give same number eg: if you are running with two  
partitions change the number to 2 in the statement .
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING,2 ); Best regards,K.Sai Dilip 
Reddy.

On Wed, Mar 30, 2016 at 3:00 PM, david kavanagh <david_...@hotmail.com> wrote:



Thanks for the reply!
I added the line as you suggested but there is still no difference 
unfortunately. I am just guessing at this stage but judging by the output below 
it, it seems like it is something to do with the partitioning or the offset.The 
warnings start by staying that  there are more tasks than partitions. Task 1 is 
assigned the partition that is created in the code (highlighted in green), then 
the rest of the tasks are not assigned any partitions.Eventually is states 
'Read partition information from: /twitter/twitter-topic-id/partition_0  --> 
null'
So it seems like it is not reading data from Kafka at all. I really don't 
understand what is going on here.Any ideas?

Kind Regards
David
--------------------------------------------------
Storm Output:
Thread-9-print] INFO  backtype.storm.daemon.executor - Prepared bolt 
print:(2)32644 [Thread-11-words] INFO  
org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting32685 
[Thread-19-words] WARN  storm.kafka.KafkaUtils - there are more tasks than 
partitions (tasks: 10; partitions: 1), some tasks will be idle32686 
[Thread-19-words] WARN  storm.kafka.KafkaUtils - Task [5/10] no partitions 
assigned32686 [Thread-17-words] WARN  storm.kafka.KafkaUtils - there are more 
tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle32686 
[Thread-15-words] WARN  storm.kafka.KafkaUtils - there are more tasks than 
partitions (tasks: 10; partitions: 1), some tasks will be idle32686 
[Thread-17-words] WARN  storm.kafka.KafkaUtils - Task [4/10] no partitions 
assigned32686 [Thread-15-words] WARN  storm.kafka.KafkaUtils - Task [3/10] no 
partitions assigned32686 [Thread-11-words] WARN  storm.kafka.KafkaUtils - there 
are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be 
idle32686 [Thread-11-words] INFO  storm.kafka.KafkaUtils - Task [1/10] assigned 
[Partition{host=127.0.0.1:9092, partition=0}]32687 [Thread-29-words] WARN  
storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; 
partitions: 1), some tasks will be idle32697 [Thread-19-words-EventThread] INFO 
 org.apache.curator.framework.state.ConnectionStateManager - State change: 
CONNECTED32697 [Thread-25-words-EventThread] INFO  
org.apache.curator.framework.state.ConnectionStateManager - State change: 
CONNECTED32697 [Thread-29-words-EventThread] INFO  
org.apache.curator.framework.state.ConnectionStateManager - State change: 
CONNECTED32697 [Thread-13-words-EventThread] INFO  
org.apache.curator.framework.state.ConnectionStateManager - State change: 
CONNECTED32697 [Thread-27-words-EventThread] INFO  
org.apache.curator.framework.state.ConnectionStateManager - State change: 
CONNECTED32697 [Thread-15-words-EventThread] INFO  
org.apache.curator.framework.state.ConnectionStateManager - State change: 
CONNECTED32689 [Thread-19-words] INFO  backtype.storm.daemon.executor - Opened 
spout words:(7)32689 [Thread-25-words] WARN  storm.kafka.KafkaUtils - there are 
more tasks than partitions (tasks: 10; partitions: 1), some tasks will be 
idle32689 [Thread-15-words] INFO  backtype.storm.daemon.executor - Opened spout 
words:(5)32738 [Thread-25-words] WARN  storm.kafka.KafkaUtils - Task [8/10] no 
partitions assigned32738 [Thread-25-words] INFO  backtype.storm.daemon.executor 
- Opened spout words:(10)32689 [Thread-17-words] INFO  
backtype.storm.daemon.executor - Opened spout words:(6)32688 [Thread-13-words] 
WARN  storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; 
partitions: 1), some tasks will be idle32688 [Thread-21-words] WARN  
storm.kafka.KafkaUtils - there are more tasks than partitions (tasks: 10; 
partitions: 1), some tasks will be idle32739 [Thread-21-words] WARN  
storm.kafka.KafkaUtils - Task [6/10] no partitions assigned32739 
[Thread-21-words] INFO  backtype.storm.daemon.executor - Opened spout 
words:(8)32687 [Thread-27-words] WARN  storm.kafka.KafkaUtils - there are more 
tasks than partitions (tasks: 10; partitions: 1), some tasks will be idle32740 
[Thread-27-words] WARN  storm.kafka.KafkaUtils - Task [9/10] no partitions 
assigned32740 [Thread-27-words] INFO  backtype.storm.daemon.executor - Opened 
spout words:(11)32687 [Thread-23-words] WARN  storm.kafka.KafkaUtils - there 
are more tasks than partitions (tasks: 10; partitions: 1), some tasks will be 
idle32740 [Thread-23-words] WARN  storm.kafka.KafkaUtils - Task [7/10] no 
partitions assigned32736 [Thread-29-words] WARN  storm.kafka.KafkaUtils - Task 
[10/10] no partitions assigned32742 [Thread-17-words] INFO  
backtype.storm.daemon.executor - Activating spout words:(6)32872 
[Thread-29-words] INFO  backtype.storm.daemon.executor - Opened spout 
words:(12)32742 [Thread-25-words] INFO  backtype.storm.daemon.executor - 
Activating spout words:(10)32742 [Thread-21-words] INFO  
backtype.storm.daemon.executor - Activating spout words:(8)32742 
[Thread-27-words] INFO  backtype.storm.daemon.executor - Activating spout 
words:(11)32742 [Thread-19-words] INFO  backtype.storm.daemon.executor - 
Activating spout words:(7)32741 [Thread-15-words] INFO  
backtype.storm.daemon.executor - Activating spout words:(5)32740 
[Thread-13-words] WARN  storm.kafka.KafkaUtils - Task [2/10] no partitions 
assigned32873 [Thread-29-words] INFO  backtype.storm.daemon.executor - 
Activating spout words:(12)32872 [Thread-23-words] INFO  
backtype.storm.daemon.executor - Opened spout words:(9)32873 [Thread-13-words] 
INFO  backtype.storm.daemon.executor - Opened spout words:(4)32873 
[Thread-23-words] INFO  backtype.storm.daemon.executor - Activating spout 
words:(9)32873 [Thread-13-words] INFO  backtype.storm.daemon.executor - 
Activating spout words:(4)37756 [Thread-23-words-EventThread] INFO  
org.apache.curator.framework.state.ConnectionStateManager - State change: 
CONNECTED37757 [Thread-17-words-EventThread] INFO  
org.apache.curator.framework.state.ConnectionStateManager - State change: 
CONNECTED37757 [Thread-21-words-EventThread] INFO  
org.apache.curator.framework.state.ConnectionStateManager - State change: 
CONNECTED37757 [Thread-11-words-EventThread] INFO  
org.apache.curator.framework.state.ConnectionStateManager - State change: 
CONNECTED37773 [Thread-11-words] INFO  storm.kafka.PartitionManager - Read 
partition information from: /twitter/twitter-topic-id/partition_0  --> 
null37915 [Thread-11-words] INFO  storm.kafka.PartitionManager - No partition 
information found, using configuration to determine offset37915 
[Thread-11-words] INFO  storm.kafka.PartitionManager - Starting Kafka 
127.0.0.1:0 from offset 18537916 [Thread-11-words] INFO  
backtype.storm.daemon.executor - Opened spout words:(3)37917 [Thread-11-words] 
INFO  backtype.storm.daemon.executor - Activating spout words:(3)62005 
[Thread-11-words] INFO  backtype.storm.daemon.executor - Processing received 
message source: __system:-1, stream: __tick, id: {}, [30]62013 
[Thread-13-words] INFO  backtype.storm.daemon.executor - Processing received 
message source: __system:-1, stream: __tick, id: {}, [30]

Date: Wed, 30 Mar 2016 10:10:54 +0530
Subject: Re: Storm KafkaSpout Integration
From: dkira...@aadhya-analytics.com
To: user@storm.apache.org

Hi david,

I think everything is good but you are missing a statement 
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); add it after the line  
config.setDebug(true); Best regards,K.Sai Dilip Reddy.

On Tue, Mar 29, 2016 at 10:03 PM, david kavanagh <david_...@hotmail.com> wrote:



Hi all,
I am currently trying use TestTopologyStaticHosts to try connect the KafkaSpout 
to a Kafka topic. I have a ZooKeeper and a Kafka instance running on my 
localhost. I have a topic named "twitter-topic" that has some tweets in it. 
This is all working as expected. I can run the consumer in the terminal and it 
returns the tweets. I want to use the KafkaSpout to connect to the Kafka topic 
and pull the tweets into a topology. I have been working on this a few days now 
and no success.
So far i have learned that when Storm is run in local mode that it uses an in 
memory zookeeper on port 2000, which would not allow it to connect to the Kafka 
topic. I have tried to get around this using the following syntax that i found 
online:
LocalCluster cluster = new LocalCluster("localhost", new Long(2181));
It is still not working but it seems to be connecting to Kafka as it gives a 
'closed socket connection' message when i cancel the operation (after it does 
not work and hangs open). It also says in the storm output that it is connected 
to localhost 2181 so it seems to be getting that far. I have included the full 
output from Storm in a txt file attached.
Here is the code i am using in the TestTopologyStaticHosts class:
 public static void main(String[] args) throws Exception {
        //String zkConnString = "localhost:2181";
        GlobalPartitionInformation hostsAndPartitions = new 
GlobalPartitionInformation();        hostsAndPartitions.addPartition(0, new 
Broker("127.0.0.1", 9092));        BrokerHosts brokerHosts = new 
StaticHosts(hostsAndPartitions);        // BrokerHosts brokerHosts = new 
ZkHosts(zkConnString, "/brokers");
        SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, 
"twitter-topic","/twitter","twitter-topic-id");        kafkaConfig.scheme = new 
SchemeAsMultiScheme(new StringScheme());        
//kafkaConfig.forceStartOffsetTime(-2);
        TopologyBuilder builder = new TopologyBuilder();        
builder.setSpout("words", new KafkaSpout(kafkaConfig), 10);        
builder.setBolt("print", new PrinterBolt()).shuffleGrouping("words");        
LocalCluster cluster = new LocalCluster("localhost", new Long(2181));        
Config config = new Config();        config.setDebug(true);        // 
config.put("storm.zookeeper.servers", "localhost");        // 
config.put("storm.zookeeper.port", "2181");         
cluster.submitTopology("kafka-test", config, builder.createTopology());
        Thread.sleep(600000);
    }
Judging by the output it seems that there is a problem with connecting to the 
Kafka partitions.I have tried many different things to get it to work but no 
luck. I have also been looking at using the KafkaSpoutTestTopology class but it 
is expecting arguments including 'dockerIp' which i don't understand.
Should i be using Storm in localmode?Should i be using the 
TestTopologyStaticHosts class or would the KafkaSpoutTestTopology class be 
better?
Any help at all would be greatly appreciated because i am really stuck.
Kind RegardsDavid Kavanagh
                                          

                                          

                                          

Reply via email to