example.shutdown(); in ConsumerGroupExample closes all consumer connections
to Kafka. remove this line the consumer threads will run forever

On Wed, Apr 29, 2015 at 9:42 PM, christopher palm <cpa...@gmail.com> wrote:

> Hi All,
>
> I am trying to get a multi threaded HL consumer working against a 2 broker
> Kafka cluster with a 4 partition 2 replica  topic.
>
> The consumer code is set to run with 4 threads, one for each partition.
>
> The producer code uses the default partitioner and loops indefinitely
> feeding events into the topic.(I excluded the while loop in the paste
> below)
>
> What I see is the threads eventually all exit, even thought the producer is
> still sending events into the topic.
>
> My understanding is that the consumer thread per partition is the correct
> setup.
>
> Any ideas why this code doesn't continue to consume events at they are
> pushed to the topic?
>
> I suspect I am configuring something wrong here, but am not sure what.
>
> Thanks,
>
> Chris
>
>
> *T**opic Configuration*
>
> Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2 Configs:
>
> Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1,2
>
> Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
>
> Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr: 1,2
>
>  Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2
>
> *Producer Code:*
>
>      Properties props = new Properties();
>
>         props.put("metadata.broker.list", args[0]);
>
>         props.put("zk.connect", args[1]);
>
>         props.put("serializer.class", "kafka.serializer.StringEncoder");
>
>         props.put("request.required.acks", "1");
>
>         String TOPIC = args[2];
>
>         ProducerConfig config = new ProducerConfig(props);
>
>         Producer<String, String> producer = new Producer<String, String>(
> config);
>
>         finalEvent = new Timestamp(new Date().getTime()) + "|"
>
>                     + truckIds[0] + "|" + driverIds[0] + "|" +
> events[random
> .nextInt(evtCnt)]
>
>                     + "|" + getLatLong(arrayroute17[i]);
>
>         try {
>
>                 KeyedMessage<String, String> data = new
> KeyedMessage<String, String>(TOPIC, finalEvent);
>
>                 LOG.info("Sending Messge #: " + routeName[0] + ": " + i +",
> msg:" + finalEvent);
>
>                 producer.send(data);
>
>                 Thread.sleep(1000);
>
>             } catch (Exception e) {
>
>                 e.printStackTrace();
>
>             }
>
>
> *Consumer Code:*
>
> public class ConsumerTest implements Runnable {
>
>    private KafkaStream m_stream;
>
>    private int m_threadNumber;
>
>    public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
>
>        m_threadNumber = a_threadNumber;
>
>        m_stream = a_stream;
>
>    }
>
>    public void run() {
>
>        ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
>
>        while (it.hasNext()){
>
>            System.out.println("Thread " + m_threadNumber + ": " + new
> String(it.next().message()));
>
>            try {
>
>              Thread.sleep(1000);
>
>             }catch (InterruptedException e) {
>
>                  e.printStackTrace();
>
>  }
>
>        }
>
>        System.out.println("Shutting down Thread: " + m_threadNumber);
>
>    }
>
> }
>
> public class ConsumerGroupExample {
>
>     private final ConsumerConnector consumer;
>
>     private final String topic;
>
>     private  ExecutorService executor;
>
>
>
>     public ConsumerGroupExample(String a_zookeeper, String a_groupId,
> String a_topic) {
>
>         consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
>
>                 createConsumerConfig(a_zookeeper, a_groupId));
>
>         this.topic = a_topic;
>
>     }
>
>
>
>     public void shutdown() {
>
>         if (consumer != null) consumer.shutdown();
>
>         if (executor != null) executor.shutdown();
>
>         try {
>
>             if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
>
>                 System.out.println("Timed out waiting for consumer threads
> to shut down, exiting uncleanly");
>
>             }
>
>         } catch (InterruptedException e) {
>
>             System.out.println("Interrupted during shutdown, exiting
> uncleanly");
>
>         }
>
>    }
>
>
>
>     public void run(int a_numThreads) {
>
>         Map<String, Integer> topicCountMap = new HashMap<String,
> Integer>();
>
>         topicCountMap.put(topic, new Integer(a_numThreads));
>
>         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
>
>         List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
>
>         executor = Executors.newFixedThreadPool(a_numThreads);
>
>         int threadNumber = 0;
>
>         for (final KafkaStream stream : streams) {
>
>             executor.submit(new ConsumerTest(stream, threadNumber));
>
>             threadNumber++;
>
>         }
>
>     }
>
>
>
>     private static ConsumerConfig createConsumerConfig(String a_zookeeper,
> String a_groupId) {
>
>         Properties props = new Properties();
>
>         props.put("zookeeper.connect", a_zookeeper);
>
>         props.put("group.id", a_groupId);
>
>         props.put("zookeeper.session.timeout.ms", "400");
>
>         props.put("zookeeper.sync.time.ms", "200");
>
>         props.put("auto.commit.interval.ms", "1000");
>
>         props.put("consumer.timeout.ms", "-1");
>
>          return new ConsumerConfig(props);
>
>     }
>
>
>
>     public static void main(String[] args) {
>
>         String zooKeeper = args[0];
>
>         String groupId = args[1];
>
>         String topic = args[2];
>
>         int threads = Integer.parseInt(args[3]);
>
>         ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper,
> groupId, topic);
>
>         example.run(threads);
>
>         try {
>
>             Thread.sleep(10000);
>
>         } catch (InterruptedException ie) {
>
>
>
>         }
>
>         example.shutdown();
>
>     }
>
> }
>



-- 
Regards,
Tao

Reply via email to