[ 
https://issues.apache.org/jira/browse/KAFKA-1343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13950256#comment-13950256
 ] 

schandr edited comment on KAFKA-1343 at 3/28/14 1:47 AM:
---------------------------------------------------------

Thanks for pointing me to the above link.
The  kafka.tools.ConsumerOffsetChecker tool helped me...I think I found what is 
causing the issue

Here is my configuration
Kafka 0.8.0
Each topic has 2 partition
one broker
0 replicas

I copied the producer code from the link below to send message to both 
partitions. I need this because, my consumer code, is using number of threads 
per partition
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example
I started the producer and I can see the messages sent to both the partition.-- 
Please look the producer screen shot.

On the consumer side, i have the following code

                Map<String, Integer> topicCountMap = new HashMap<String, 
Integer>();
                topicCountMap.put(topic, new Integer(threadCount));
                Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = 
consumer
                                .createMessageStreams(topicCountMap);
                List<KafkaStream<byte[], byte[]>> streams = 
consumerMap.get(topic);

                // now launch all the threads
                executor = Executors.newFixedThreadPool(threadCount);
                // now create an object to consume the messages
                int threadNumber = 0;
                for (final KafkaStream<byte[], byte[]> stream : streams) {
                        executor.submit(new StreamIteratorTask(stream, 
resourceUUID, topic,consumerGroupId, threadNumber));

and the StreamIteratorTask run method looks like

        public void run() {
                ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while (it.hasNext()){
                MessageAndMetadata<byte[], byte[]> msgAndmtdata = it.next();
                        byte message[] = msgAndmtdata.message();
                        long offset = msgAndmtdata.offset();
                        MessageAndOffset msgAndOffst = new 
MessageAndOffset(message, offset);
System.out.println(" Offset: " + msgAndOffset.getOffset()
                                + " Message: " + new 
String(msgAndOffset.getMessageBytes())
                                +" for consumer group : "  + consumerGroupId);

and i am setting the following consumer config properties

private ConsumerConfig createConsumerConfig(final String consumerGroupId) {
                Properties props = new Properties();
                props.put("zookeeper.connect",                          
zookeeperConfigs.get("zookeeper.connect"));
                props.put("zookeeper.session.timeout.ms",       
zookeeperConfigs.get("zookeeper.session.timeout.ms"));
                props.put("zookeeper.sync.time.ms",             
zookeeperConfigs.get("zookeeper.sync.time.ms"));
                props.put("auto.commit.interval.ms",            
zookeeperConfigs.get("auto.commit.interval.ms"));
                props.put("consumer.timeout.ms",                        
zookeeperConfigs.get("consumer.timeout.ms"));
                props.put("group.id",                                           
consumerGroupId);
                
                return new ConsumerConfig(props);
        }
I had consumer.timeout.ms set to 3000, which was too little, once I increased 
it to a min(since it a dev env where I am pumping messages continously), it 
started working. But I noticed something else, where if there are no messages 
more than 1 min(I waited for 5 min), the consumers are not timing out.

Should I create a seperate bug for the time out issue? 


was (Author: schandr):
Thanks for pointing me to the above link.
The  kafka.tools.ConsumerOffsetChecker tool helped me...I think I found what is 
causing the issue..but trying to understand the why part though.

Here is my configuration
Kafka 0.8.0
Each topic has 2 partition
one broker
0 replicas

I copied the producer code from the link below to send message to both 
partitions. I need this because, my consumer code, is using number of threads 
per partition
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example
I started the producer and I can see the messages sent to both the partition.

> Kafka consumer iterator thread stalls
> -------------------------------------
>
>                 Key: KAFKA-1343
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1343
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: schandr
>         Attachments: producerscreenshot.jpg
>
>
> My dev enviornment has Kafka 0.8.0. I am using High level consumer API. My 
> use case has one topic with 2 partitions. My code is using two threads to 
> read from the kafka stream. The runnable iterator task has the below 
> implementation
>       public void run() {
>               ConsumerIterator<byte[], byte[]> it = stream.iterator();
>         while (it.hasNext()){
>               MessageAndMetadata<byte[], byte[]> msgAndmtdata = it.next();
>                       byte message[] = msgAndmtdata.message();
>                       long offset = msgAndmtdata.offset();
>                       MessageAndOffset msgAndOffst = new 
> MessageAndOffset(message, offset);
>                       System.out.println(" Message offset for topic : " + 
> topicName + ": " + offset + "thread :" + thread + "Message: " + new 
> String(message));
>                       
>         }
>       }
> I am using kafka console producer to produce messages. When the test starts, 
> the above logs the messages produced so far, but stalls for new messages 
> produced by the console producer. No exception in the kafka logs.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to