[ 
https://issues.apache.org/jira/browse/KAFKA-973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13709531#comment-13709531
 ] 

Joel Koshy commented on KAFKA-973:
----------------------------------

Can you try sending more messages? The default partitioner is random so all the 
partitions should get messages (as long as you send enough messages - three 
messages ending up on one partition can happen).
                
> Messages From Producer Not being Partitioned 
> ---------------------------------------------
>
>                 Key: KAFKA-973
>                 URL: https://issues.apache.org/jira/browse/KAFKA-973
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.8
>         Environment: Linux
>            Reporter: Subbu Srinivasan
>            Assignee: Neha Narkhede
>              Labels: newbie
>
> I created a two node cluster.
> 2 zoo keepers
> 2 brokers
> 1 topic with replication factor (2) and no of partition 2.
> my consumer group has two threads
> 1) From my Java client - I send few  messages to the topic. I have set 
> multiple brokers
> kafka2:9092,kafka1:9092.
> Only one thread in my consumer always gets the messages. It looks like 
> producer is not
> partitioning the requests properly.
> 2) However if I send some sample using the simple console producer, I see 
> multiple threads getting
> requests and is load balanced.
> What am I doing wrong in my client?
> public class KafkaProducer {
>         
>         private final Properties props = new Properties();
>         private static AtomicLong counter = new AtomicLong(0);
>         kafka.javaapi.producer.Producer<Integer, String> producer = null;
>         
>         public KafkaProducer() 
>         {
>           props.put("serializer.class", "kafka.serializer.StringEncoder");
>           props.put("metadata.broker.list", 
> ConfigurationUtility.getKafkaHost());
>           producer = new kafka.javaapi.producer.Producer<Integer, String>(new 
> ProducerConfig(props));
>         } 
>         
>         public void sendMessage(String msg) throws Exception
>         {
>                 producer.send(new KeyedMessage<Integer, 
> String>(ConfigurationUtility.getTopicName(), msg));
>         }       
>         
>         
>         public static void main(String arg[]) throws Exception
>         {
>                 
>                 ConfigurationUtility.setKafkaHost("kafka2:9092,kafka1:9092");
>                       ConfigurationUtility.setTopicName("dnslog");
>                       
> ConfigurationUtility.setZooKeeperHost("kafka1:2181,kafka2:2181");
>                       ConfigurationUtility.setConsumerGroupId("dnslog");
>                       
>                       for(int i = 0 ; i < 2 ; ++i)
>                       {
>                               (new 
> KafkaProducer()).sendMessage(UUID.randomUUID().toString());
>                       }
>         }
> }

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to