[
https://issues.apache.org/jira/browse/KAFKA-1343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13950256#comment-13950256
]
schandr edited comment on KAFKA-1343 at 3/28/14 1:47 AM:
---------------------------------------------------------
Thanks for pointing me to the above link.
The kafka.tools.ConsumerOffsetChecker tool helped me...I think I found what is
causing the issue
Here is my configuration
Kafka 0.8.0
Each topic has 2 partition
one broker
0 replicas
I copied the producer code from the link below to send message to both
partitions. I need this because, my consumer code, is using number of threads
per partition
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example
I started the producer and I can see the messages sent to both the partition.--
Please look the producer screen shot.
On the consumer side, i have the following code
Map<String, Integer> topicCountMap = new HashMap<String,
Integer>();
topicCountMap.put(topic, new Integer(threadCount));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumer
.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams =
consumerMap.get(topic);
// now launch all the threads
executor = Executors.newFixedThreadPool(threadCount);
// now create an object to consume the messages
int threadNumber = 0;
for (final KafkaStream<byte[], byte[]> stream : streams) {
executor.submit(new StreamIteratorTask(stream,
resourceUUID, topic,consumerGroupId, threadNumber));
and the StreamIteratorTask run method looks like
public void run() {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()){
MessageAndMetadata<byte[], byte[]> msgAndmtdata = it.next();
byte message[] = msgAndmtdata.message();
long offset = msgAndmtdata.offset();
MessageAndOffset msgAndOffst = new
MessageAndOffset(message, offset);
System.out.println(" Offset: " + msgAndOffset.getOffset()
+ " Message: " + new
String(msgAndOffset.getMessageBytes())
+" for consumer group : " + consumerGroupId);
and i am setting the following consumer config properties
private ConsumerConfig createConsumerConfig(final String consumerGroupId) {
Properties props = new Properties();
props.put("zookeeper.connect",
zookeeperConfigs.get("zookeeper.connect"));
props.put("zookeeper.session.timeout.ms",
zookeeperConfigs.get("zookeeper.session.timeout.ms"));
props.put("zookeeper.sync.time.ms",
zookeeperConfigs.get("zookeeper.sync.time.ms"));
props.put("auto.commit.interval.ms",
zookeeperConfigs.get("auto.commit.interval.ms"));
props.put("consumer.timeout.ms",
zookeeperConfigs.get("consumer.timeout.ms"));
props.put("group.id",
consumerGroupId);
return new ConsumerConfig(props);
}
I had consumer.timeout.ms set to 3000, which was too little, once I increased
it to a min(since it a dev env where I am pumping messages continously), it
started working. But I noticed something else, where if there are no messages
more than 1 min(I waited for 5 min), the consumers are not timing out.
Should I create a seperate bug for the time out issue?
was (Author: schandr):
Thanks for pointing me to the above link.
The kafka.tools.ConsumerOffsetChecker tool helped me...I think I found what is
causing the issue..but trying to understand the why part though.
Here is my configuration
Kafka 0.8.0
Each topic has 2 partition
one broker
0 replicas
I copied the producer code from the link below to send message to both
partitions. I need this because, my consumer code, is using number of threads
per partition
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example
I started the producer and I can see the messages sent to both the partition.
> Kafka consumer iterator thread stalls
> -------------------------------------
>
> Key: KAFKA-1343
> URL: https://issues.apache.org/jira/browse/KAFKA-1343
> Project: Kafka
> Issue Type: Bug
> Reporter: schandr
> Attachments: producerscreenshot.jpg
>
>
> My dev enviornment has Kafka 0.8.0. I am using High level consumer API. My
> use case has one topic with 2 partitions. My code is using two threads to
> read from the kafka stream. The runnable iterator task has the below
> implementation
> public void run() {
> ConsumerIterator<byte[], byte[]> it = stream.iterator();
> while (it.hasNext()){
> MessageAndMetadata<byte[], byte[]> msgAndmtdata = it.next();
> byte message[] = msgAndmtdata.message();
> long offset = msgAndmtdata.offset();
> MessageAndOffset msgAndOffst = new
> MessageAndOffset(message, offset);
> System.out.println(" Message offset for topic : " +
> topicName + ": " + offset + "thread :" + thread + "Message: " + new
> String(message));
>
> }
> }
> I am using kafka console producer to produce messages. When the test starts,
> the above logs the messages produced so far, but stalls for new messages
> produced by the console producer. No exception in the kafka logs.
--
This message was sent by Atlassian JIRA
(v6.2#6252)