[ 
https://issues.apache.org/jira/browse/NIFI-1722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleg Zhurakousky updated NIFI-1722:
-----------------------------------
    Fix Version/s:     (was: 0.7.0)

> Intermittent deadlocks in Kafka API when when stopping GetKafka
> ---------------------------------------------------------------
>
>                 Key: NIFI-1722
>                 URL: https://issues.apache.org/jira/browse/NIFI-1722
>             Project: Apache NiFi
>          Issue Type: Bug
>            Reporter: Oleg Zhurakousky
>            Assignee: Oleg Zhurakousky
>
> It appears that Kafka gets in the state of deadlock when 
> _ConsumerConnector.commitOffsets(..)_ is executed during stop call in 
> GetKafka. Looking at the thread dump it may be related to the fact that we 
> are shutting down consumer when onTrigger is still executing. But It also 
> appears that onTrigger is in the deadlock state as well.
> Below are the relevant thread dump segments
> {code}
> "StandardProcessScheduler Thread-7" Id=115 BLOCKED  on 
> java.lang.Object@2baae51
>       at 
> kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:333)
>       at 
> kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:324)
> "ConsumerFetcherThread-74993895-50e9-3962-90e3-97af1fed7294_daves-nifi-cluster-2-1459431214410-d3b5143c-0-1001"
>  Id=25120 WAITING  on 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@23dc03a2
>       at sun.misc.Unsafe.park(Native Method)
>       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>       at 
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350)
>       at 
> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
>       at 
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:53)
>       at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$3.apply(AbstractFetcherThread.scala:142)
>       at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$3.apply(AbstractFetcherThread.scala:126)
>       at scala.Option.foreach(Option.scala:236)
>       at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:126)
>       at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:123)
>       at scala.collection.immutable.Map$Map2.foreach(Map.scala:130)
>       at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:123)
>       at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:123)
>       at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:123)
>       at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:298)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:122)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98)
>       at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
>       Number of Locked Synchronizers: 1
>       - java.util.concurrent.locks.ReentrantLock$NonfairSync@6b99259f
> "ConsumerFetcherThread-33156eec-156c-4b32-9598-d5fc3ca460ce_daves-nifi-cluster-2-1459519432175-5beddff4-0-1001"
>  Id=35266 RUNNABLE  (in native code)
>       at sun.nio.ch.Net.poll(Native Method)
>       at sun.nio.ch.SocketChannelImpl.poll(SocketChannelImpl.java:954)
>       - waiting on java.lang.Object@3e91ba2b
>       at 
> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:204)
>       - waiting on java.lang.Object@494070b0
>       at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
>       - waiting on sun.nio.ch.SocketAdaptor$SocketInputStream@3ade0aef
>       at 
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
>       - waiting on java.lang.Object@7d49c744
>       at kafka.utils.CoreUtils$.read(CoreUtils.scala:192)
>       at 
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>       at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>       at 
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>       at kafka.network.BlockingChannel.receive(BlockingChannel.scala:131)
>       at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
>       at 
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:78)
>       - waiting on java.lang.Object@4eba7a24
>       at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:122)
>       at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:122)
>       at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:122)
>       at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>       at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:121)
>       at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:121)
>       at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:121)
>       at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>       at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:120)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:106)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98)
>       at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> "Timer-Driven Process Thread-10" Id=92 TIMED_WAITING  on 
> java.util.concurrent.CountDownLatch$Sync@7a46b8e8
>       at sun.misc.Unsafe.park(Native Method)
>       at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
>       at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277)
>       at 
> org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:67)
>       at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:48)
>       at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
>       at 
> org.apache.nifi.processors.kafka.KafkaPublisher.processAcks(KafkaPublisher.java:152)
>       at 
> org.apache.nifi.processors.kafka.KafkaPublisher.publish(KafkaPublisher.java:141)
>       at 
> org.apache.nifi.processors.kafka.PutKafka$1.process(PutKafka.java:299)
>       at 
> org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1807)
>       at 
> org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1778)
>       at 
> org.apache.nifi.processors.kafka.PutKafka.onTrigger(PutKafka.java:295)
>       at 
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
>       at 
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1059)
>       at 
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
>       at 
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
>       at 
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
>       Number of Locked Synchronizers: 1
>       - java.util.concurrent.ThreadPoolExecutor$Worker@2bc3a748
> "StandardProcessScheduler Thread-7" Id=115 BLOCKED  on 
> java.lang.Object@2baae51
>       at 
> kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:333)
>       at 
> kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:324)
>       at 
> kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:110)
>       at 
> org.apache.nifi.processors.kafka.GetKafka.shutdownConsumer(GetKafka.java:297)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:497)
>       at 
> org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:137)
>       at 
> org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:125)
>       at 
> org.apache.nifi.util.ReflectionUtils.quietlyInvokeMethodsWithAnnotations(ReflectionUtils.java:233)
>       at 
> org.apache.nifi.util.ReflectionUtils.quietlyInvokeMethodsWithAnnotation(ReflectionUtils.java:85)
>       at 
> org.apache.nifi.controller.StandardProcessorNode$2.run(StandardProcessorNode.java:1332)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
>       Number of Locked Synchronizers: 1
>       - java.util.concurrent.ThreadPoolExecutor$Worker@36a2bc42
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to