Jack,

The fetchers are blocked on the queue since it is full, is your consumer
iterator stopped and hence not getting more data from it?

Guozhang

On Thu, Oct 23, 2014 at 3:53 PM, Jack Foy <j...@whitepages.com> wrote:

> Hi all,
>
> We run kafka 0.8.1.1. We’re tracking down a problem where consumer groups
> stop pulling from their respective partitions a few minutes or hours into
> execution. It looks like all ConsumerFetcherThreads associated with that
> consumer are blocking while waiting to write data to a LinkedBlockingQueue.
> They are waiting on ConditionObjects with different object IDs, and those
> object IDs do not occur elsewhere within our snapshot of thread data. It
> appears that those threads never make progress once they enter this waiting
> state.
>
> KAFKA-937 looks like a very similar symptom:
> https://issues.apache.org/jira/browse/KAFKA-937 According to Jun Rao’s
> comments on that issue, a ConsumerFetcherThread should never be blocked. Is
> that still the case?
>
> Here’s the thread dump for the relevant threads. I can provide more
> information if needed.
>
> "ConsumerFetcherThread-sumo-relay_kafka0.util.pages-1414089954648-5244fae6-0-0"
> prio=10 tid=0x00007f1954a9c800 nid=0xbf0 waiting on condition
> [0x00007f19339f8000]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x000000008ac24dd8> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
>     at
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
>     at
> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
>     at
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
>     at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
>     at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
>     at scala.collection.immutable.Map$Map3.foreach(Map.scala:154)
>     at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
>     at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
>     at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
>     at kafka.utils.Utils$.inLock(Utils.scala:538)
>     at
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
>     at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
>     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>
> "ConsumerFetcherThread-sumo-relay_kafka0.util.pages-1414089954648-5244fae6-0-1"
> prio=10 tid=0x00007f1955657000 nid=0xbf3 waiting on condition
> [0x00007f19321e0000]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x000000008ad280e8> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
>     at
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
>     at
> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
>     at
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
>     at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
>     at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
>     at scala.collection.immutable.Map$Map3.foreach(Map.scala:154)
>     at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
>     at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
>     at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
>     at kafka.utils.Utils$.inLock(Utils.scala:538)
>     at
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
>     at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
>     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>
> "ConsumerFetcherThread-sumo-relay_kafka0.util.pages-1414089954648-5244fae6-0-2"
> prio=10 tid=0x00007f1954001000 nid=0xbf1 waiting on condition
> [0x00007f19326e5000]
>    java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x000000008abf72c0> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
>     at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
>     at
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
>     at
> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
>     at
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
>     at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
>     at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
>     at scala.collection.immutable.Map$Map2.foreach(Map.scala:130)
>     at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
>     at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
>     at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
>     at kafka.utils.Utils$.inLock(Utils.scala:538)
>     at
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
>     at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
>     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>
> --
> Jack Foy <j...@whitepages.com<mailto:j...@whitepages.com>>
>
>
>
>


-- 
-- Guozhang

Reply via email to