example.shutdown(); in ConsumerGroupExample closes all consumer connections to Kafka. remove this line the consumer threads will run forever
On Wed, Apr 29, 2015 at 9:42 PM, christopher palm <cpa...@gmail.com> wrote: > Hi All, > > I am trying to get a multi threaded HL consumer working against a 2 broker > Kafka cluster with a 4 partition 2 replica topic. > > The consumer code is set to run with 4 threads, one for each partition. > > The producer code uses the default partitioner and loops indefinitely > feeding events into the topic.(I excluded the while loop in the paste > below) > > What I see is the threads eventually all exit, even thought the producer is > still sending events into the topic. > > My understanding is that the consumer thread per partition is the correct > setup. > > Any ideas why this code doesn't continue to consume events at they are > pushed to the topic? > > I suspect I am configuring something wrong here, but am not sure what. > > Thanks, > > Chris > > > *T**opic Configuration* > > Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2 Configs: > > Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1,2 > > Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 > > Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr: 1,2 > > Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2 > > *Producer Code:* > > Properties props = new Properties(); > > props.put("metadata.broker.list", args[0]); > > props.put("zk.connect", args[1]); > > props.put("serializer.class", "kafka.serializer.StringEncoder"); > > props.put("request.required.acks", "1"); > > String TOPIC = args[2]; > > ProducerConfig config = new ProducerConfig(props); > > Producer<String, String> producer = new Producer<String, String>( > config); > > finalEvent = new Timestamp(new Date().getTime()) + "|" > > + truckIds[0] + "|" + driverIds[0] + "|" + > events[random > .nextInt(evtCnt)] > > + "|" + getLatLong(arrayroute17[i]); > > try { > > KeyedMessage<String, String> data = new > KeyedMessage<String, String>(TOPIC, finalEvent); > > LOG.info("Sending Messge #: " + routeName[0] + ": " + i +", > msg:" + finalEvent); > > producer.send(data); > > Thread.sleep(1000); > > } catch (Exception e) { > > e.printStackTrace(); > > } > > > *Consumer Code:* > > public class ConsumerTest implements Runnable { > > private KafkaStream m_stream; > > private int m_threadNumber; > > public ConsumerTest(KafkaStream a_stream, int a_threadNumber) { > > m_threadNumber = a_threadNumber; > > m_stream = a_stream; > > } > > public void run() { > > ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); > > while (it.hasNext()){ > > System.out.println("Thread " + m_threadNumber + ": " + new > String(it.next().message())); > > try { > > Thread.sleep(1000); > > }catch (InterruptedException e) { > > e.printStackTrace(); > > } > > } > > System.out.println("Shutting down Thread: " + m_threadNumber); > > } > > } > > public class ConsumerGroupExample { > > private final ConsumerConnector consumer; > > private final String topic; > > private ExecutorService executor; > > > > public ConsumerGroupExample(String a_zookeeper, String a_groupId, > String a_topic) { > > consumer = kafka.consumer.Consumer.createJavaConsumerConnector( > > createConsumerConfig(a_zookeeper, a_groupId)); > > this.topic = a_topic; > > } > > > > public void shutdown() { > > if (consumer != null) consumer.shutdown(); > > if (executor != null) executor.shutdown(); > > try { > > if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { > > System.out.println("Timed out waiting for consumer threads > to shut down, exiting uncleanly"); > > } > > } catch (InterruptedException e) { > > System.out.println("Interrupted during shutdown, exiting > uncleanly"); > > } > > } > > > > public void run(int a_numThreads) { > > Map<String, Integer> topicCountMap = new HashMap<String, > Integer>(); > > topicCountMap.put(topic, new Integer(a_numThreads)); > > Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = > consumer.createMessageStreams(topicCountMap); > > List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); > > executor = Executors.newFixedThreadPool(a_numThreads); > > int threadNumber = 0; > > for (final KafkaStream stream : streams) { > > executor.submit(new ConsumerTest(stream, threadNumber)); > > threadNumber++; > > } > > } > > > > private static ConsumerConfig createConsumerConfig(String a_zookeeper, > String a_groupId) { > > Properties props = new Properties(); > > props.put("zookeeper.connect", a_zookeeper); > > props.put("group.id", a_groupId); > > props.put("zookeeper.session.timeout.ms", "400"); > > props.put("zookeeper.sync.time.ms", "200"); > > props.put("auto.commit.interval.ms", "1000"); > > props.put("consumer.timeout.ms", "-1"); > > return new ConsumerConfig(props); > > } > > > > public static void main(String[] args) { > > String zooKeeper = args[0]; > > String groupId = args[1]; > > String topic = args[2]; > > int threads = Integer.parseInt(args[3]); > > ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, > groupId, topic); > > example.run(threads); > > try { > > Thread.sleep(10000); > > } catch (InterruptedException ie) { > > > > } > > example.shutdown(); > > } > > } > -- Regards, Tao