[ https://issues.apache.org/jira/browse/NIFI-1722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15328191#comment-15328191 ]
Oleg Zhurakousky commented on NIFI-1722: ---------------------------------------- Technically, there is not much we can do about it outside of writing our own custom consumer directly to the protocol possibly reusing some of the existing low level Kafka API (something I've toyed around with). Basically Kafka exposes blocking calls (you can see https://issues.apache.org/jira/browse/KAFKA-2391) which are not yet addressed and there is no ETA > Intermittent deadlocks in Kafka API when when stopping GetKafka > --------------------------------------------------------------- > > Key: NIFI-1722 > URL: https://issues.apache.org/jira/browse/NIFI-1722 > Project: Apache NiFi > Issue Type: Bug > Reporter: Oleg Zhurakousky > Assignee: Oleg Zhurakousky > Fix For: 0.7.0 > > > It appears that Kafka gets in the state of deadlock when > _ConsumerConnector.commitOffsets(..)_ is executed during stop call in > GetKafka. Looking at the thread dump it may be related to the fact that we > are shutting down consumer when onTrigger is still executing. But It also > appears that onTrigger is in the deadlock state as well. > Below are the relevant thread dump segments > {code} > "StandardProcessScheduler Thread-7" Id=115 BLOCKED on > java.lang.Object@2baae51 > at > kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:333) > at > kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:324) > "ConsumerFetcherThread-74993895-50e9-3962-90e3-97af1fed7294_daves-nifi-cluster-2-1459431214410-d3b5143c-0-1001" > Id=25120 WAITING on > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@23dc03a2 > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) > at > java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350) > at > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) > at > kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:53) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$3.apply(AbstractFetcherThread.scala:142) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$3.apply(AbstractFetcherThread.scala:126) > at scala.Option.foreach(Option.scala:236) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:126) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:123) > at scala.collection.immutable.Map$Map2.foreach(Map.scala:130) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:123) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:123) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:123) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:298) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:122) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) > Number of Locked Synchronizers: 1 > - java.util.concurrent.locks.ReentrantLock$NonfairSync@6b99259f > "ConsumerFetcherThread-33156eec-156c-4b32-9598-d5fc3ca460ce_daves-nifi-cluster-2-1459519432175-5beddff4-0-1001" > Id=35266 RUNNABLE (in native code) > at sun.nio.ch.Net.poll(Native Method) > at sun.nio.ch.SocketChannelImpl.poll(SocketChannelImpl.java:954) > - waiting on java.lang.Object@3e91ba2b > at > sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:204) > - waiting on java.lang.Object@494070b0 > at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) > - waiting on sun.nio.ch.SocketAdaptor$SocketInputStream@3ade0aef > at > java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) > - waiting on java.lang.Object@7d49c744 > at kafka.utils.CoreUtils$.read(CoreUtils.scala:192) > at > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > at kafka.network.Receive$class.readCompletely(Transmission.scala:56) > at > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:131) > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81) > at > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:78) > - waiting on java.lang.Object@4eba7a24 > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:122) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:122) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:122) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:121) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:121) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:121) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:120) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:106) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) > "Timer-Driven Process Thread-10" Id=92 TIMED_WAITING on > java.util.concurrent.CountDownLatch$Sync@7a46b8e8 > at sun.misc.Unsafe.park(Native Method) > at > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277) > at > org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:67) > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:48) > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25) > at > org.apache.nifi.processors.kafka.KafkaPublisher.processAcks(KafkaPublisher.java:152) > at > org.apache.nifi.processors.kafka.KafkaPublisher.publish(KafkaPublisher.java:141) > at > org.apache.nifi.processors.kafka.PutKafka$1.process(PutKafka.java:299) > at > org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1807) > at > org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1778) > at > org.apache.nifi.processors.kafka.PutKafka.onTrigger(PutKafka.java:295) > at > org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) > at > org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1059) > at > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) > at > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) > at > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Number of Locked Synchronizers: 1 > - java.util.concurrent.ThreadPoolExecutor$Worker@2bc3a748 > "StandardProcessScheduler Thread-7" Id=115 BLOCKED on > java.lang.Object@2baae51 > at > kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:333) > at > kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:324) > at > kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:110) > at > org.apache.nifi.processors.kafka.GetKafka.shutdownConsumer(GetKafka.java:297) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:137) > at > org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:125) > at > org.apache.nifi.util.ReflectionUtils.quietlyInvokeMethodsWithAnnotations(ReflectionUtils.java:233) > at > org.apache.nifi.util.ReflectionUtils.quietlyInvokeMethodsWithAnnotation(ReflectionUtils.java:85) > at > org.apache.nifi.controller.StandardProcessorNode$2.run(StandardProcessorNode.java:1332) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Number of Locked Synchronizers: 1 > - java.util.concurrent.ThreadPoolExecutor$Worker@36a2bc42 > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)