Hi Jason, 

I am using 0.9 broker. 

One more observation. I had written producer code with 0.9 - Even with Producer 
code, I had hanging issue where send method was hanging requesting metadata. 
Thread-dump below 

"main" prio=6 tid=0x0000000002238000 nid=0x1390 in Object.wait() 
[0x00000000025bf000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
            at java.lang.Object.wait(Native Method)
            - waiting on <0x00000007aecacea0> (a 
org.apache.kafka.clients.Metadata)
            at org.apache.kafka.clients.Metadata.awaitUpdate(Metadata.java:121)
            - locked <0x00000007aecacea0> (a org.apache.kafka.clients.Metadata)
            at 
org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:483)
            at 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:412)
            at 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:339)
            at kafkaexperiments.ProducerDriver.main(ProducerDriver.java:41)

Then I included metadata.fetch.timeout.ms=1 and then producer started working. 
But when I poll the same topic using kafka-console-consumer.sh, 
console-consumer also hangs. 



Regards
Muthu


-----Original Message-----
From: Jason Gustafson [mailto:ja...@confluent.io] 
Sent: Friday, March 04, 2016 5:33 AM
To: users@kafka.apache.org
Subject: Re: Consumer deadlock

Hi there,

Just to clarify, is the broker still on 0.8? Unfortunately, the new consumer 
needs 0.9. That probably would explain the hanging.

-Jason

On Thu, Mar 3, 2016 at 2:28 AM, Muthukumaran K <muthukumara...@ericsson.com>
wrote:

> Ewen,
>
> By new Consumer API, you mean KafkaConsumer ? I have an issue with a 
> poll in 0.9.0.1. poll hangs indefinitely even with the timeout
>
> Following is the consumer code which I am using. Any pointers would be 
> helpful
>
> public class ConsumerLoop implements Runnable {
>
>
>     private final KafkaConsumer<String, String> consumer;
>     private final List<String> topics;
>     private final int id;
>
>     public ConsumerLoop(int id,
>                       String groupId,
>                       List<String> topics) {
>         this.id = id;
>         this.topics = topics;
>         Properties props = new Properties();
>         props.put("bootstrap.servers", "192.168.56.101:9092");
>         props.put("group.id", groupId);
>         props.put("auto.offset.reset", "earliest");
>         props.put("key.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>         props.put("value.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
>         props.put("metadata.fetch.timeout.ms", 1);
>
>         this.consumer = new KafkaConsumer<>(props);
>
>     }
>
>     @Override
>     public void run() {
>         try {
>
>             System.out.println("Starting consumer ID : " + id +
>                     " Thread : " + Thread.currentThread().getName() +
>                     " Topic : " + topics.toString() +
>                     " ... ");
>             long startTime = System.currentTimeMillis();
>             int recordCount = 0;
>
>           consumer.subscribe(topics);
>
>           System.out.println("Consumer-ID " + id + " after 
> subscribe...");
>
>           while (true) {
>             ConsumerRecords<String, String> records = 
> consumer.poll(10000);
>
>             System.out.println("Consumer-ID " + id + " after 
> poll...");
>
>
>             for (ConsumerRecord<String, String> record : records) {
>               Map<String, Object> data = new HashMap<>();
>               data.put("partition", record.partition());
>               data.put("offset", record.offset());
>               data.put("value", record.value());
>               System.out.println(
>                       "Consumer-ID : " + this.id +
>                               ": " + data +
>                               " Thread_name : " + 
> Thread.currentThread().getName());
>               recordCount++;
>
>             }
>             long endTime = System.currentTimeMillis();
>             long duration = (endTime - startTime)/1000;
>             System.out.println("###### rate : " + recordCount/duration + "
> msgs/sec on Consumer ID " + id);
>
>           }
>         } catch (WakeupException e) {
>           // ignore for shutdown
>         } finally {
>           consumer.close();
>         }
>     }
>
>     public void shutdown() {
>
>         consumer.wakeup();
>     }
>
> Regards
> Muthu
>
>
> -----Original Message-----
> From: Ewen Cheslack-Postava [mailto:e...@confluent.io]
> Sent: Thursday, March 03, 2016 2:21 PM
> To: users@kafka.apache.org
> Subject: Re: Consumer deadlock
>
> Take a look at the consumer.timeout.ms setting if you don't want the 
> iterator to block indefinitely.
>
> And a better long term solution is to switch to the new consumer, but 
> that obviously requires much more significant code changes. The new 
> consumer API is a single-threaded poll-based API where you can always 
> specify timeouts to the consumer's poll() method (though it currently 
> has some limitations to how it enforces that timeout).
>
> -Ewen
>
> On Wed, Mar 2, 2016 at 11:24 AM, Oleg Zhurakousky < 
> ozhurakou...@hortonworks.com> wrote:
>
> > Guys
> >
> > We have a consumer deadlock and here is the relevant dump:
> >
> > "Timer-Driven Process Thread-10" Id=76 TIMED_WAITING  on
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@
> 39775787
> >         at sun.misc.Unsafe.park(Native Method)
> >         at
> > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> >         at
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.
> awaitNanos(AbstractQueuedSynchronizer.java:2078)
> >         at
> >
> java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java
> :467)
> >         at
> > kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:65)
> >         at
> > kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
> >         at
> > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
> >         at
> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
> >         . . . . .
> >
> > What worries me is the fact that ‘hasNext’ is essentially a blocking 
> > operation. I can’t seem to find a single reason when it would be 
> > useful, hence I am calling it a bug, but hopefully someone can clarify.
> > Kafka version is 0.8.*
> >
> > Cheers
> > Oleg
> >
> >
>
>
> --
> Thanks,
> Ewen
>

Reply via email to