Hi there,

Just to clarify, is the broker still on 0.8? Unfortunately, the new
consumer needs 0.9. That probably would explain the hanging.

-Jason

On Thu, Mar 3, 2016 at 2:28 AM, Muthukumaran K <muthukumara...@ericsson.com>
wrote:

> Ewen,
>
> By new Consumer API, you mean KafkaConsumer ? I have an issue with a poll
> in 0.9.0.1. poll hangs indefinitely even with the timeout
>
> Following is the consumer code which I am using. Any pointers would be
> helpful
>
> public class ConsumerLoop implements Runnable {
>
>
>     private final KafkaConsumer<String, String> consumer;
>     private final List<String> topics;
>     private final int id;
>
>     public ConsumerLoop(int id,
>                       String groupId,
>                       List<String> topics) {
>         this.id = id;
>         this.topics = topics;
>         Properties props = new Properties();
>         props.put("bootstrap.servers", "192.168.56.101:9092");
>         props.put("group.id", groupId);
>         props.put("auto.offset.reset", "earliest");
>         props.put("key.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>         props.put("value.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>         props.put("metadata.fetch.timeout.ms", 1);
>
>         this.consumer = new KafkaConsumer<>(props);
>
>     }
>
>     @Override
>     public void run() {
>         try {
>
>             System.out.println("Starting consumer ID : " + id +
>                     " Thread : " + Thread.currentThread().getName() +
>                     " Topic : " + topics.toString() +
>                     " ... ");
>             long startTime = System.currentTimeMillis();
>             int recordCount = 0;
>
>           consumer.subscribe(topics);
>
>           System.out.println("Consumer-ID " + id + " after subscribe...");
>
>           while (true) {
>             ConsumerRecords<String, String> records = consumer.poll(10000);
>
>             System.out.println("Consumer-ID " + id + " after poll...");
>
>
>             for (ConsumerRecord<String, String> record : records) {
>               Map<String, Object> data = new HashMap<>();
>               data.put("partition", record.partition());
>               data.put("offset", record.offset());
>               data.put("value", record.value());
>               System.out.println(
>                       "Consumer-ID : " + this.id +
>                               ": " + data +
>                               " Thread_name : " +
> Thread.currentThread().getName());
>               recordCount++;
>
>             }
>             long endTime = System.currentTimeMillis();
>             long duration = (endTime - startTime)/1000;
>             System.out.println("###### rate : " + recordCount/duration + "
> msgs/sec on Consumer ID " + id);
>
>           }
>         } catch (WakeupException e) {
>           // ignore for shutdown
>         } finally {
>           consumer.close();
>         }
>     }
>
>     public void shutdown() {
>
>         consumer.wakeup();
>     }
>
> Regards
> Muthu
>
>
> -----Original Message-----
> From: Ewen Cheslack-Postava [mailto:e...@confluent.io]
> Sent: Thursday, March 03, 2016 2:21 PM
> To: users@kafka.apache.org
> Subject: Re: Consumer deadlock
>
> Take a look at the consumer.timeout.ms setting if you don't want the
> iterator to block indefinitely.
>
> And a better long term solution is to switch to the new consumer, but that
> obviously requires much more significant code changes. The new consumer API
> is a single-threaded poll-based API where you can always specify timeouts
> to the consumer's poll() method (though it currently has some limitations
> to how it enforces that timeout).
>
> -Ewen
>
> On Wed, Mar 2, 2016 at 11:24 AM, Oleg Zhurakousky <
> ozhurakou...@hortonworks.com> wrote:
>
> > Guys
> >
> > We have a consumer deadlock and here is the relevant dump:
> >
> > "Timer-Driven Process Thread-10" Id=76 TIMED_WAITING  on
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@39775787
> >         at sun.misc.Unsafe.park(Native Method)
> >         at
> > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> >         at
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> >         at
> >
> java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
> >         at
> > kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:65)
> >         at
> > kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
> >         at
> > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
> >         at
> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
> >         . . . . .
> >
> > What worries me is the fact that ‘hasNext’ is essentially a blocking
> > operation. I can’t seem to find a single reason when it would be
> > useful, hence I am calling it a bug, but hopefully someone can clarify.
> > Kafka version is 0.8.*
> >
> > Cheers
> > Oleg
> >
> >
>
>
> --
> Thanks,
> Ewen
>

Reply via email to