Hi, how many consumers can I connect to a kafka cluster? I am running a kafka server with a separate zookeeper server in my development box. For testing purpose I am now trying to create a a< large number of consumers, e.g. 100 or so. But once I created the 40th consumer I start to get timeouts. Also I have noticed that the conencton time get longer an longer.
My consumer code looks like this: public class KafkaConsumer implements Runnable{ public long count = 0; public long errors = 0; private IMessageQueue context; private ConsumerConnector consumer; private int a_numThreads = 1; private KafkaStream<byte[], byte[]> stream; private volatile boolean running = true; private String topic; @Override public void run() { Logger.log(KafkaConsumer.class, "run()", "enter > " + this.topic); ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (running && it.hasNext()){ try { // do something count++; } catch (Exception e) { errors++; } } } public KafkaConsumer(IMQListener<ILogEvent> fct, IMessageQueue context, String zookeper, String topic) { this.fct = fct; this.context = context; this.topic = topic; /** * get the kafka streams */ ConsumerConfig consumerConfig = createConsumerConfig(zookeper, "group1"); consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(a_numThreads)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); this.stream = streams.get(0); /** * start one thread reading from the queue */ Thread t = new Thread(this); t.start(); } public void shutdown() { Logger.log(KafkaConsumer.class, "shutdown()", "enter > " + this.topic); if (consumer != null) consumer.shutdown(); running = false; } private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) { Properties props = new Properties(); props.put("zookeeper.connect", a_zookeeper); props.put("group.id", a_groupId); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); return new ConsumerConfig(props); } } Cheers, KLaus -- -- Klaus Schaefers Senior Optimization Manager Ligatus GmbH Hohenstaufenring 30-32 D-50674 Köln Tel.: +49 (0) 221 / 56939 -784 Fax: +49 (0) 221 / 56 939 - 599 E-Mail: klaus.schaef...@ligatus.com Web: www.ligatus.de HRB Köln 56003 Geschäftsführung: Dipl.-Kaufmann Lars Hasselbach, Dipl.-Kaufmann Klaus Ludemann, Dipl.-Wirtschaftsingenieur Arne Wolter