This can also happen if at least one of the consumer threads (not the
fetcher threads) die. You can inspect the thread dump to see if all your
consumer threads are alive.

On Thu, Oct 23, 2014 at 5:05 PM, Guozhang Wang <[email protected]> wrote:

> Jack,
>
> The fetchers are blocked on the queue since it is full, is your consumer
> iterator stopped and hence not getting more data from it?
>
> Guozhang
>
> On Thu, Oct 23, 2014 at 3:53 PM, Jack Foy <[email protected]> wrote:
>
> > Hi all,
> >
> > We run kafka 0.8.1.1. We’re tracking down a problem where consumer groups
> > stop pulling from their respective partitions a few minutes or hours into
> > execution. It looks like all ConsumerFetcherThreads associated with that
> > consumer are blocking while waiting to write data to a
> LinkedBlockingQueue.
> > They are waiting on ConditionObjects with different object IDs, and those
> > object IDs do not occur elsewhere within our snapshot of thread data. It
> > appears that those threads never make progress once they enter this
> waiting
> > state.
> >
> > KAFKA-937 looks like a very similar symptom:
> > https://issues.apache.org/jira/browse/KAFKA-937 According to Jun Rao’s
> > comments on that issue, a ConsumerFetcherThread should never be blocked.
> Is
> > that still the case?
> >
> > Here’s the thread dump for the relevant threads. I can provide more
> > information if needed.
> >
> >
> "ConsumerFetcherThread-sumo-relay_kafka0.util.pages-1414089954648-5244fae6-0-0"
> > prio=10 tid=0x00007f1954a9c800 nid=0xbf0 waiting on condition
> > [0x00007f19339f8000]
> >    java.lang.Thread.State: WAITING (parking)
> >     at sun.misc.Unsafe.park(Native Method)
> >     - parking to wait for  <0x000000008ac24dd8> (a
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> >     at
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
> >     at
> >
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
> >     at
> > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
> >     at
> >
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
> >     at
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
> >     at
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
> >     at scala.collection.immutable.Map$Map3.foreach(Map.scala:154)
> >     at
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
> >     at
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
> >     at
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
> >     at kafka.utils.Utils$.inLock(Utils.scala:538)
> >     at
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
> >     at
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> >     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> >
> >
> "ConsumerFetcherThread-sumo-relay_kafka0.util.pages-1414089954648-5244fae6-0-1"
> > prio=10 tid=0x00007f1955657000 nid=0xbf3 waiting on condition
> > [0x00007f19321e0000]
> >    java.lang.Thread.State: WAITING (parking)
> >     at sun.misc.Unsafe.park(Native Method)
> >     - parking to wait for  <0x000000008ad280e8> (a
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> >     at
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
> >     at
> >
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
> >     at
> > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
> >     at
> >
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
> >     at
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
> >     at
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
> >     at scala.collection.immutable.Map$Map3.foreach(Map.scala:154)
> >     at
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
> >     at
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
> >     at
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
> >     at kafka.utils.Utils$.inLock(Utils.scala:538)
> >     at
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
> >     at
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> >     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> >
> >
> "ConsumerFetcherThread-sumo-relay_kafka0.util.pages-1414089954648-5244fae6-0-2"
> > prio=10 tid=0x00007f1954001000 nid=0xbf1 waiting on condition
> > [0x00007f19326e5000]
> >    java.lang.Thread.State: WAITING (parking)
> >     at sun.misc.Unsafe.park(Native Method)
> >     - parking to wait for  <0x000000008abf72c0> (a
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> >     at
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
> >     at
> >
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
> >     at
> > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
> >     at
> >
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
> >     at
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
> >     at
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
> >     at scala.collection.immutable.Map$Map2.foreach(Map.scala:130)
> >     at
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
> >     at
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
> >     at
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
> >     at kafka.utils.Utils$.inLock(Utils.scala:538)
> >     at
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
> >     at
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> >     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> >
> > --
> > Jack Foy <[email protected]<mailto:[email protected]>>
> >
> >
> >
> >
>
>
> --
> -- Guozhang
>

Reply via email to