Hi Jason, I am using 0.9 broker.
One more observation. I had written producer code with 0.9 - Even with Producer code, I had hanging issue where send method was hanging requesting metadata. Thread-dump below "main" prio=6 tid=0x0000000002238000 nid=0x1390 in Object.wait() [0x00000000025bf000] java.lang.Thread.State: TIMED_WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on <0x00000007aecacea0> (a org.apache.kafka.clients.Metadata) at org.apache.kafka.clients.Metadata.awaitUpdate(Metadata.java:121) - locked <0x00000007aecacea0> (a org.apache.kafka.clients.Metadata) at org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:483) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:412) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:339) at kafkaexperiments.ProducerDriver.main(ProducerDriver.java:41) Then I included metadata.fetch.timeout.ms=1 and then producer started working. But when I poll the same topic using kafka-console-consumer.sh, console-consumer also hangs. Regards Muthu -----Original Message----- From: Jason Gustafson [mailto:ja...@confluent.io] Sent: Friday, March 04, 2016 5:33 AM To: users@kafka.apache.org Subject: Re: Consumer deadlock Hi there, Just to clarify, is the broker still on 0.8? Unfortunately, the new consumer needs 0.9. That probably would explain the hanging. -Jason On Thu, Mar 3, 2016 at 2:28 AM, Muthukumaran K <muthukumara...@ericsson.com> wrote: > Ewen, > > By new Consumer API, you mean KafkaConsumer ? I have an issue with a > poll in 0.9.0.1. poll hangs indefinitely even with the timeout > > Following is the consumer code which I am using. Any pointers would be > helpful > > public class ConsumerLoop implements Runnable { > > > private final KafkaConsumer<String, String> consumer; > private final List<String> topics; > private final int id; > > public ConsumerLoop(int id, > String groupId, > List<String> topics) { > this.id = id; > this.topics = topics; > Properties props = new Properties(); > props.put("bootstrap.servers", "192.168.56.101:9092"); > props.put("group.id", groupId); > props.put("auto.offset.reset", "earliest"); > props.put("key.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > props.put("value.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > props.put("metadata.fetch.timeout.ms", 1); > > this.consumer = new KafkaConsumer<>(props); > > } > > @Override > public void run() { > try { > > System.out.println("Starting consumer ID : " + id + > " Thread : " + Thread.currentThread().getName() + > " Topic : " + topics.toString() + > " ... "); > long startTime = System.currentTimeMillis(); > int recordCount = 0; > > consumer.subscribe(topics); > > System.out.println("Consumer-ID " + id + " after > subscribe..."); > > while (true) { > ConsumerRecords<String, String> records = > consumer.poll(10000); > > System.out.println("Consumer-ID " + id + " after > poll..."); > > > for (ConsumerRecord<String, String> record : records) { > Map<String, Object> data = new HashMap<>(); > data.put("partition", record.partition()); > data.put("offset", record.offset()); > data.put("value", record.value()); > System.out.println( > "Consumer-ID : " + this.id + > ": " + data + > " Thread_name : " + > Thread.currentThread().getName()); > recordCount++; > > } > long endTime = System.currentTimeMillis(); > long duration = (endTime - startTime)/1000; > System.out.println("###### rate : " + recordCount/duration + " > msgs/sec on Consumer ID " + id); > > } > } catch (WakeupException e) { > // ignore for shutdown > } finally { > consumer.close(); > } > } > > public void shutdown() { > > consumer.wakeup(); > } > > Regards > Muthu > > > -----Original Message----- > From: Ewen Cheslack-Postava [mailto:e...@confluent.io] > Sent: Thursday, March 03, 2016 2:21 PM > To: users@kafka.apache.org > Subject: Re: Consumer deadlock > > Take a look at the consumer.timeout.ms setting if you don't want the > iterator to block indefinitely. > > And a better long term solution is to switch to the new consumer, but > that obviously requires much more significant code changes. The new > consumer API is a single-threaded poll-based API where you can always > specify timeouts to the consumer's poll() method (though it currently > has some limitations to how it enforces that timeout). > > -Ewen > > On Wed, Mar 2, 2016 at 11:24 AM, Oleg Zhurakousky < > ozhurakou...@hortonworks.com> wrote: > > > Guys > > > > We have a consumer deadlock and here is the relevant dump: > > > > "Timer-Driven Process Thread-10" Id=76 TIMED_WAITING on > > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@ > 39775787 > > at sun.misc.Unsafe.park(Native Method) > > at > > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) > > at > > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject. > awaitNanos(AbstractQueuedSynchronizer.java:2078) > > at > > > java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java > :467) > > at > > kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:65) > > at > > kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33) > > at > > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) > > at > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) > > . . . . . > > > > What worries me is the fact that ‘hasNext’ is essentially a blocking > > operation. I can’t seem to find a single reason when it would be > > useful, hence I am calling it a bug, but hopefully someone can clarify. > > Kafka version is 0.8.* > > > > Cheers > > Oleg > > > > > > > -- > Thanks, > Ewen >