Hi,
I see the following scenario:
1. Send messages under some topic X, able to see the log folder in Kafka
Broker with name X-0 (Zeroth partition) and having files xxx.log and
xxx.index under them. So guess this is fine
2. THen I fire up the consumer for topic X, it is able to find two streams
(mapping to two partitions I have defined).
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumer
.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = null;
Iterator<String> it = topicCountMap.keySet().iterator();
int threadNumber= 0;
while(it.hasNext()) {
String topic = it.next();
streams = consumerMap.get(topic);
for (KafkaStream stream : streams) {
System.out.println("threadNo =" + threadNumber + " for
topic = " + topic );
new ConsumerThreadRunnable(stream, threadNumber, topic));
threadNumber++;
}
}
However I don't get any messages in the CounsmerTHreanRunnable here
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext() ) {
byte[] nextMessageByteArray = it.next().message();
}
If I start the consumer first and then restart the producer thread, sending
the messages for topic X then consumer is able to receive the messages.
>From kafka docs the high-level consumer thread does long polling till the
message is available.
What is wrong I'm doing? Any idea to get around the problem.
thanks!
--
Kind Regards,
Shafaq