btw -- curious to know how well your kafka broker handles the load... please do inform us your result.
Regards, Nitin Kumar Sharma. On Mon, Dec 22, 2014 at 9:52 AM, nitin sharma <kumarsharma.ni...@gmail.com> wrote: > Hey Pramod, > > few things: > a. You can keep 2 brokers but you can increase the ProducerSend thread on > your producer side to push more messages. best way try to create more > threads that execute the loop where "send" is called. > b. try to avoid/reduce putting any logic computation in the while loop .. > You can try using for loop instead of while :. > for(int i=0; i<messageCount;i++) > { > KeyedMessage<String, String> message = > new KeyedMessage<String, String>(topic, i, msg); > > producer.send(message); > } > > Regards, > Nitin Kumar Sharma. > > > On Sun, Dec 21, 2014 at 11:18 PM, Pramod Deshmukh <dpram...@gmail.com> > wrote: > >> *Kafka: *Apache Kafka 0.8.1.1 >> >> >> *SImplePartitioner.java* >> public int partition(Object key, int a_numPartitions) { >> int partition = Integer.parseInt((String)key); >> LOG.debug("SimplePartitioner Partion: " + partition); >> return partition; >> } >> >> >> >> On Sun, Dec 21, 2014 at 10:54 PM, Pramod Deshmukh <dpram...@gmail.com> >> wrote: >> >> > I have a requirement to prove kafka producer can produce 1 million >> > events/second to Kafka cluster. >> > >> > So far, best I could achieve is 200k events/sec on topic with 2 >> > partitions. The latency increases with adding more partitions so I want >> to >> > test with 2 partitions for now. >> > >> > Below are the details along with produce code (java). How can I achieve >> > produce 1million event/sec.? I went thru kafka benchmarking blog as >> well. >> > >> > >> https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines >> > >> > *Kafka cluster:* 3 brokers on 3 servers. Each sever is 16 TB (16 JBODs), >> > 64GB RAM. >> > *Broker:* Allocated 6GB, 16 io.threads, 8 network threads. >> > *Topic: 2* partition, replication factor of 1 (Get high latency) >> > *Zookeepers: *3 zk instances running individually on master nodes (not >> > co-located with kafka broker/servers) >> > >> > >> > *Producer Code:* >> > public class TestProducer { >> > >> > private static String msg = "TEST KAFKA PERFORMANCE"; >> > private static Logger LOG = Logger.getLogger(TestProducer.class); >> > >> > public static void main(String... args){ >> > System.out.println("START - Test Producer"); >> > >> > long messageCount = Long.parseLong(args[0]); >> > long messageCountForStat = Long.parseLong(args[0]); >> > String topic = args[1]; >> > String brokerList = args[2]; >> > int batchCount = Integer.parseInt(args[3]); >> > int topicPartions = Integer.parseInt(args[4]); >> > Producer<String, String> producer = getProducer(brokerList, >> > batchCount); >> > Date startTime = new Date(System.currentTimeMillis()); >> > Random rnd = new Random(); >> > String partition = ""; >> > //Produce messages. >> > while (messageCount != 0) { >> > partition = ""+(int)messageCount%topicPartions; >> > KeyedMessage<String, String> message = >> > new KeyedMessage<String, String>(topic, partition, >> > msg); >> > producer.send(message); >> > messageCount--; >> > } >> > >> > Date endTime = new Date(System.currentTimeMillis()); >> > System.out.println("#########################################"); >> > System.out.println("MESSAGES SENT: " + messageCountForStat); >> > System.out.println("START TIME: " + startTime); >> > System.out.println("END TIME: " + endTime); >> > System.out.println("#########################################"); >> > System.out.println("END - Test Producer"); >> > } >> > >> > public static Producer<String, String> getProducer(String >> brokerList, >> > int batchSize) { >> > >> > props.put("metadata.broker.list", brokerList); >> > props.put("serializer.class", "kafka.serializer.StringEncoder"); >> > props.put("partitioner.class", "com.my.SimplePartitioner"); >> > props.put("request.required.acks", "0"); >> > props.put("producer.type", "async"); >> > props.put("compression.codec", "snappy"); >> > props.put("batch.num.messages", Integer.toString(batchSize)); >> > >> > ProducerConfig config = new ProducerConfig(props); >> > >> > Producer<String, String> producer = new Producer<String, >> > String>(config); >> > return producer; >> > } >> > >> > } >> > >> > >