Hi All,
I am getting ridiculously low producer and consumer throughput.
I am using default config values for producer, consumer and broker
which are very good starting points, as they should yield sufficient
throughput.
Only config that I changed on the server is "num-partitions". Changed
it to 20 (instead of 1). With this change the throughput increased to
2k messages per second (size 1k), but still it is far lower than what
I would have expected.
Appreciate if you can point to settings/changes-in-code needs to be done
to get higher throughput.
====Consumer Code=====
long startTime = System.currentTimeMillis();
long endTime = startTime + runDuration*1000l;
Properties props = new Properties();
props.put("zk.connect", "localhost:2181");
props.put("groupid", subscriptionName); // to support multiple
subscribers
props.put("zk.sessiontimeout.ms", "400");
props.put("zk.synctime.ms", "200");
props.put("autocommit.interval.ms", "1000");
consConfig = new ConsumerConfig(props);
consumer =
kafka.consumer.Consumer.createJavaConsumerConnector(consConfig);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topicName, new Integer(1)); // has the topic
to which to subscribe to
Map<String, List<KafkaMessageStream<Message>>> consumerMap =
consumer.createMessageStreams(topicCountMap);
KafkaMessageStream<Message> stream = consumerMap.get(topicName).get(0);
ConsumerIterator<Message> it = stream.iterator();
while(System.currentTimeMillis() <= endTime )
{
it.next(); // discard data
consumeMsgCount.incrementAndGet();
}
====End consumer CODE============================
=====Producer CODE========================
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("zk.connect", "localhost:2181");
// Use random partitioner. Don't need the key type. Just
set it to Integer.
// The message is of type String.
producer = new kafka.javaapi.producer.Producer<Integer,
String>(new ProducerConfig(props));
long endTime = startTime + runDuration*1000l; // run duration
is in seconds
while(System.currentTimeMillis() <= endTime )
{
String msg =
org.apache.commons.lang.RandomStringUtils.random(msgSizes.get(0));
producer.send(new ProducerData<Integer, String>(topicName, msg));
pc.incrementAndGet();
}
java.util.Date date = new java.util.Date(System.currentTimeMillis());
System.out.println(date+" :: stopped producer for topic"+topicName);
=====END Producer CODE========================
--
Regards,
Praveen Ramachandra
--
--
Regards,
Praveen Ramachandra