Hi Jason, 

I am using 0.9 broker. 

One more observation. I had written producer code with 0.9 - Even with Producer 
code, I had hanging issue where send method was hanging requesting metadata. 
Thread-dump below 

"main" prio=6 tid=0x0000000002238000 nid=0x1390 in Object.wait() 
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
            at java.lang.Object.wait(Native Method)
            - waiting on <0x00000007aecacea0> (a 
            at org.apache.kafka.clients.Metadata.awaitUpdate(Metadata.java:121)
            - locked <0x00000007aecacea0> (a org.apache.kafka.clients.Metadata)
            at kafkaexperiments.ProducerDriver.main(ProducerDriver.java:41)

Then I included metadata.fetch.timeout.ms=1 and then producer started working. 
But when I poll the same topic using kafka-console-consumer.sh, 
console-consumer also hangs. 


Hi there,

Just to clarify, is the broker still on 0.8? Unfortunately, the new consumer 
needs 0.9. That probably would explain the hanging.


On Thu, Mar 3, 2016 at 2:28 AM, Muthukumaran K <muthukumara...@ericsson.com>

> Ewen,
> By new Consumer API, you mean KafkaConsumer ? I have an issue with a 
> poll in poll hangs indefinitely even with the timeout
> Following is the consumer code which I am using. Any pointers would be 
> helpful
> public class ConsumerLoop implements Runnable {
>     private final KafkaConsumer<String, String> consumer;
>     private final List<String> topics;
>     private final int id;
>     public ConsumerLoop(int id,
>                       String groupId,
>                       List<String> topics) {
>         this.id = id;
>         this.topics = topics;
>         Properties props = new Properties();
>         props.put("bootstrap.servers", "");
>         props.put("group.id", groupId);
>         props.put("auto.offset.reset", "earliest");
>         props.put("key.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>         props.put("value.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
>         props.put("metadata.fetch.timeout.ms", 1);
>         this.consumer = new KafkaConsumer<>(props);
>     }
>     @Override
>     public void run() {
>         try {
>             System.out.println("Starting consumer ID : " + id +
>                     " Thread : " + Thread.currentThread().getName() +
>                     " Topic : " + topics.toString() +
>                     " ... ");
>             long startTime = System.currentTimeMillis();
>             int recordCount = 0;
>           consumer.subscribe(topics);
>           System.out.println("Consumer-ID " + id + " after 
> subscribe...");
>           while (true) {
>             ConsumerRecords<String, String> records = 
> consumer.poll(10000);
>             System.out.println("Consumer-ID " + id + " after 
> poll...");
>             for (ConsumerRecord<String, String> record : records) {
>               Map<String, Object> data = new HashMap<>();
>               data.put("partition", record.partition());
>               data.put("offset", record.offset());
>               data.put("value", record.value());
>               System.out.println(
>                       "Consumer-ID : " + this.id +
>                               ": " + data +
>                               " Thread_name : " + 
> Thread.currentThread().getName());
>               recordCount++;
>             }
>             long endTime = System.currentTimeMillis();
>             long duration = (endTime - startTime)/1000;
>             System.out.println("###### rate : " + recordCount/duration + "
> msgs/sec on Consumer ID " + id);
>           }
>         } catch (WakeupException e) {
>           // ignore for shutdown
>         } finally {
>           consumer.close();
>         }
>     }
>     public void shutdown() {
>         consumer.wakeup();
>     }
> Regards
> Muthu
> Take a look at the consumer.timeout.ms setting if you don't want the 
> iterator to block indefinitely.
> And a better long term solution is to switch to the new consumer, but 
> that obviously requires much more significant code changes. The new 
> consumer API is a single-threaded poll-based API where you can always 
> specify timeouts to the consumer's poll() method (though it currently 
> has some limitations to how it enforces that timeout).
> -Ewen
> On Wed, Mar 2, 2016 at 11:24 AM, Oleg Zhurakousky < 
> ozhurakou...@hortonworks.com> wrote:
> > Guys
> >
> > We have a consumer deadlock and here is the relevant dump:
> >
> > "Timer-Driven Process Thread-10" Id=76 TIMED_WAITING  on
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@
> 39775787
> >         at sun.misc.Unsafe.park(Native Method)
> >         at
> > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> >         at
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.
> awaitNanos(AbstractQueuedSynchronizer.java:2078)
> >         at
> >
> java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java
> :467)
> >         at
> > kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:65)
> >         at
> > kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
> >         at
> > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
> >         at
> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
> >         . . . . .
> >
> > What worries me is the fact that ‘hasNext’ is essentially a blocking 
> > operation. I can’t seem to find a single reason when it would be 
> > useful, hence I am calling it a bug, but hopefully someone can clarify.
> > Kafka version is 0.8.*
> >
> > Cheers
> > Oleg
> >
> >
