[ 
https://issues.apache.org/jira/browse/NIFI-1722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15328191#comment-15328191
 ] 

Oleg Zhurakousky commented on NIFI-1722:
----------------------------------------

Technically, there is not much we can do about it outside of writing our own 
custom consumer directly to the protocol possibly reusing some of the existing 
low level Kafka API (something I've toyed around with). 
Basically Kafka exposes blocking calls (you can see 
https://issues.apache.org/jira/browse/KAFKA-2391) which are not yet addressed 
and there is no ETA


> Intermittent deadlocks in Kafka API when when stopping GetKafka
> ---------------------------------------------------------------
>
>                 Key: NIFI-1722
>                 URL: https://issues.apache.org/jira/browse/NIFI-1722
>             Project: Apache NiFi
>          Issue Type: Bug
>            Reporter: Oleg Zhurakousky
>            Assignee: Oleg Zhurakousky
>             Fix For: 0.7.0
>
>
> It appears that Kafka gets in the state of deadlock when 
> _ConsumerConnector.commitOffsets(..)_ is executed during stop call in 
> GetKafka. Looking at the thread dump it may be related to the fact that we 
> are shutting down consumer when onTrigger is still executing. But It also 
> appears that onTrigger is in the deadlock state as well.
> Below are the relevant thread dump segments
> {code}
> "StandardProcessScheduler Thread-7" Id=115 BLOCKED  on 
> java.lang.Object@2baae51
>       at 
> kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:333)
>       at 
> kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:324)
> "ConsumerFetcherThread-74993895-50e9-3962-90e3-97af1fed7294_daves-nifi-cluster-2-1459431214410-d3b5143c-0-1001"
>  Id=25120 WAITING  on 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@23dc03a2
>       at sun.misc.Unsafe.park(Native Method)
>       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>       at 
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350)
>       at 
> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
>       at 
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:53)
>       at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$3.apply(AbstractFetcherThread.scala:142)
>       at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$3.apply(AbstractFetcherThread.scala:126)
>       at scala.Option.foreach(Option.scala:236)
>       at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:126)
>       at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:123)
>       at scala.collection.immutable.Map$Map2.foreach(Map.scala:130)
>       at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:123)
>       at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:123)
>       at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:123)
>       at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:298)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:122)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98)
>       at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
>       Number of Locked Synchronizers: 1
>       - java.util.concurrent.locks.ReentrantLock$NonfairSync@6b99259f
> "ConsumerFetcherThread-33156eec-156c-4b32-9598-d5fc3ca460ce_daves-nifi-cluster-2-1459519432175-5beddff4-0-1001"
>  Id=35266 RUNNABLE  (in native code)
>       at sun.nio.ch.Net.poll(Native Method)
>       at sun.nio.ch.SocketChannelImpl.poll(SocketChannelImpl.java:954)
>       - waiting on java.lang.Object@3e91ba2b
>       at 
> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:204)
>       - waiting on java.lang.Object@494070b0
>       at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
>       - waiting on sun.nio.ch.SocketAdaptor$SocketInputStream@3ade0aef
>       at 
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
>       - waiting on java.lang.Object@7d49c744
>       at kafka.utils.CoreUtils$.read(CoreUtils.scala:192)
>       at 
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>       at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>       at 
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>       at kafka.network.BlockingChannel.receive(BlockingChannel.scala:131)
>       at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
>       at 
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:78)
>       - waiting on java.lang.Object@4eba7a24
>       at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:122)
>       at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:122)
>       at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:122)
>       at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>       at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:121)
>       at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:121)
>       at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:121)
>       at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>       at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:120)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:106)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98)
>       at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> "Timer-Driven Process Thread-10" Id=92 TIMED_WAITING  on 
> java.util.concurrent.CountDownLatch$Sync@7a46b8e8
>       at sun.misc.Unsafe.park(Native Method)
>       at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
>       at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277)
>       at 
> org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:67)
>       at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:48)
>       at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
>       at 
> org.apache.nifi.processors.kafka.KafkaPublisher.processAcks(KafkaPublisher.java:152)
>       at 
> org.apache.nifi.processors.kafka.KafkaPublisher.publish(KafkaPublisher.java:141)
>       at 
> org.apache.nifi.processors.kafka.PutKafka$1.process(PutKafka.java:299)
>       at 
> org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1807)
>       at 
> org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1778)
>       at 
> org.apache.nifi.processors.kafka.PutKafka.onTrigger(PutKafka.java:295)
>       at 
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
>       at 
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1059)
>       at 
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
>       at 
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
>       at 
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
>       Number of Locked Synchronizers: 1
>       - java.util.concurrent.ThreadPoolExecutor$Worker@2bc3a748
> "StandardProcessScheduler Thread-7" Id=115 BLOCKED  on 
> java.lang.Object@2baae51
>       at 
> kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:333)
>       at 
> kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:324)
>       at 
> kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:110)
>       at 
> org.apache.nifi.processors.kafka.GetKafka.shutdownConsumer(GetKafka.java:297)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:497)
>       at 
> org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:137)
>       at 
> org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:125)
>       at 
> org.apache.nifi.util.ReflectionUtils.quietlyInvokeMethodsWithAnnotations(ReflectionUtils.java:233)
>       at 
> org.apache.nifi.util.ReflectionUtils.quietlyInvokeMethodsWithAnnotation(ReflectionUtils.java:85)
>       at 
> org.apache.nifi.controller.StandardProcessorNode$2.run(StandardProcessorNode.java:1332)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
>       Number of Locked Synchronizers: 1
>       - java.util.concurrent.ThreadPoolExecutor$Worker@36a2bc42
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to