hi:
I use high-level consumer api to get message. But I want to use non-block
method. Below is the code:
ConsumerConnector consumer =
kafka.consumer.Consumer.createJavaConsumerConnector(consumerconfig);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put("topic", new Integer(1));
Map<String, List<KafkaMessageStream<Message>>> consumerMap =
consumer.createMessageStreams(topicCountMap);
KafkaMessageStream<Message> stream = consumerMap.get("topic").get(0);
ConsumerIterator<Message> it = stream.iterator();
while(it.hasNext())
{
ByteBuffer buffer = it.next().payload();
byte [] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
System.out.println(new String(bytes));
}
The problem is that when there are no message,the program is blocked at
"it.hasNext()";But I want to break out. Are there some methods?
Thanks!