Hi,

how many consumers can I connect to a kafka cluster? I am running a kafka
server with a separate zookeeper server in my development box. For testing
purpose I am now trying to create a a< large number of consumers, e.g. 100
or so. But once I created the 40th consumer I start to get timeouts. Also I
have noticed that the conencton time get longer an longer.


My consumer code looks like this:



public class KafkaConsumer implements Runnable{

    public long count = 0;

    public long errors = 0;

    private IMessageQueue context;

    private ConsumerConnector consumer;

    private int a_numThreads = 1;

    private KafkaStream<byte[], byte[]> stream;

    private volatile boolean running = true;

    private String topic;


    @Override
    public void run() {
        Logger.log(KafkaConsumer.class, "run()", "enter >  " + this.topic);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while (running && it.hasNext()){
             try {

                 // do something
                 count++;

             } catch (Exception e) {
                    errors++;
             }
         }

    }





    public KafkaConsumer(IMQListener<ILogEvent> fct, IMessageQueue context,
String zookeper, String topic) {
        this.fct = fct;
        this.context = context;
        this.topic = topic;


        /**
         * get the kafka streams
         */
        ConsumerConfig consumerConfig = createConsumerConfig(zookeper,
"group1");
        consumer =
kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(a_numThreads));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
        this.stream = streams.get(0);


        /**
         * start one thread reading from the queue
         */
        Thread t = new Thread(this);
        t.start();


    }




     public void shutdown() {
         Logger.log(KafkaConsumer.class, "shutdown()", "enter >  " +
this.topic);

         if (consumer != null)
                consumer.shutdown();


         running = false;
     }


     private static ConsumerConfig createConsumerConfig(String a_zookeeper,
String a_groupId) {
            Properties props = new Properties();
            props.put("zookeeper.connect", a_zookeeper);
            props.put("group.id", a_groupId);
            props.put("zookeeper.session.timeout.ms", "400");
            props.put("zookeeper.sync.time.ms", "200");
            props.put("auto.commit.interval.ms", "1000");

            return new ConsumerConfig(props);
        }



}



Cheers,

KLaus

-- 

-- 

Klaus Schaefers
Senior Optimization Manager

Ligatus GmbH
Hohenstaufenring 30-32
D-50674 Köln

Tel.:  +49 (0) 221 / 56939 -784
Fax:  +49 (0) 221 / 56 939 - 599
E-Mail: klaus.schaef...@ligatus.com
Web: www.ligatus.de

HRB Köln 56003
Geschäftsführung:
Dipl.-Kaufmann Lars Hasselbach, Dipl.-Kaufmann Klaus Ludemann,
Dipl.-Wirtschaftsingenieur Arne Wolter

Reply via email to