[ https://issues.apache.org/jira/browse/NIFI-1722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15317808#comment-15317808 ]
Joseph Witt commented on NIFI-1722: ----------------------------------- is this being worked or should it be moved off 0.7.0? > Intermittent deadlocks in Kafka API when when stopping GetKafka > --------------------------------------------------------------- > > Key: NIFI-1722 > URL: https://issues.apache.org/jira/browse/NIFI-1722 > Project: Apache NiFi > Issue Type: Bug > Reporter: Oleg Zhurakousky > Assignee: Oleg Zhurakousky > Fix For: 0.7.0 > > > It appears that Kafka gets in the state of deadlock when > _ConsumerConnector.commitOffsets(..)_ is executed during stop call in > GetKafka. Looking at the thread dump it may be related to the fact that we > are shutting down consumer when onTrigger is still executing. But It also > appears that onTrigger is in the deadlock state as well. > Below are the relevant thread dump segments > {code} > "StandardProcessScheduler Thread-7" Id=115 BLOCKED on > java.lang.Object@2baae51 > at > kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:333) > at > kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:324) > "ConsumerFetcherThread-74993895-50e9-3962-90e3-97af1fed7294_daves-nifi-cluster-2-1459431214410-d3b5143c-0-1001" > Id=25120 WAITING on > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@23dc03a2 > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) > at > java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350) > at > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) > at > kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:53) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$3.apply(AbstractFetcherThread.scala:142) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$3.apply(AbstractFetcherThread.scala:126) > at scala.Option.foreach(Option.scala:236) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:126) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:123) > at scala.collection.immutable.Map$Map2.foreach(Map.scala:130) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:123) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:123) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:123) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:298) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:122) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) > Number of Locked Synchronizers: 1 > - java.util.concurrent.locks.ReentrantLock$NonfairSync@6b99259f > "ConsumerFetcherThread-33156eec-156c-4b32-9598-d5fc3ca460ce_daves-nifi-cluster-2-1459519432175-5beddff4-0-1001" > Id=35266 RUNNABLE (in native code) > at sun.nio.ch.Net.poll(Native Method) > at sun.nio.ch.SocketChannelImpl.poll(SocketChannelImpl.java:954) > - waiting on java.lang.Object@3e91ba2b > at > sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:204) > - waiting on java.lang.Object@494070b0 > at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) > - waiting on sun.nio.ch.SocketAdaptor$SocketInputStream@3ade0aef > at > java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) > - waiting on java.lang.Object@7d49c744 > at kafka.utils.CoreUtils$.read(CoreUtils.scala:192) > at > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > at kafka.network.Receive$class.readCompletely(Transmission.scala:56) > at > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:131) > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81) > at > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:78) > - waiting on java.lang.Object@4eba7a24 > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:122) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:122) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:122) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:121) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:121) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:121) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:120) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:106) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) > "Timer-Driven Process Thread-10" Id=92 TIMED_WAITING on > java.util.concurrent.CountDownLatch$Sync@7a46b8e8 > at sun.misc.Unsafe.park(Native Method) > at > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277) > at > org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:67) > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:48) > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25) > at > org.apache.nifi.processors.kafka.KafkaPublisher.processAcks(KafkaPublisher.java:152) > at > org.apache.nifi.processors.kafka.KafkaPublisher.publish(KafkaPublisher.java:141) > at > org.apache.nifi.processors.kafka.PutKafka$1.process(PutKafka.java:299) > at > org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1807) > at > org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1778) > at > org.apache.nifi.processors.kafka.PutKafka.onTrigger(PutKafka.java:295) > at > org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) > at > org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1059) > at > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) > at > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) > at > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Number of Locked Synchronizers: 1 > - java.util.concurrent.ThreadPoolExecutor$Worker@2bc3a748 > "StandardProcessScheduler Thread-7" Id=115 BLOCKED on > java.lang.Object@2baae51 > at > kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:333) > at > kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:324) > at > kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:110) > at > org.apache.nifi.processors.kafka.GetKafka.shutdownConsumer(GetKafka.java:297) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:137) > at > org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:125) > at > org.apache.nifi.util.ReflectionUtils.quietlyInvokeMethodsWithAnnotations(ReflectionUtils.java:233) > at > org.apache.nifi.util.ReflectionUtils.quietlyInvokeMethodsWithAnnotation(ReflectionUtils.java:85) > at > org.apache.nifi.controller.StandardProcessorNode$2.run(StandardProcessorNode.java:1332) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Number of Locked Synchronizers: 1 > - java.util.concurrent.ThreadPoolExecutor$Worker@36a2bc42 > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)