[ https://issues.apache.org/jira/browse/NIFI-1684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15213043#comment-15213043 ]
Oleg Zhurakousky commented on NIFI-1684: ---------------------------------------- Also, since it's still open I am fixing partitioner issue that was reported in the mailing list on 02/25/2016 > When the Kafka broker and Zookeeper instance used are replaced behavior is > poor > ------------------------------------------------------------------------------- > > Key: NIFI-1684 > URL: https://issues.apache.org/jira/browse/NIFI-1684 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions > Affects Versions: 0.6.0 > Reporter: Joseph Witt > Assignee: Oleg Zhurakousky > Fix For: 0.7.0 > > > The Kafka broker and zookeeper instance being used were replaced while NiFi > was running. > During that time PutKafka had NullPointerExceptions such as: > {quote} > 2016-03-24 05:12:58,427 WARN [Timer-Driven Process Thread-5] > o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding > PutKafka[id=f8b2f669-fec5-3b26-ad2b-bca > dff0c6543] due to uncaught Exception: java.lang.NullPointerException > 2016-03-24 05:12:58,429 WARN [Timer-Driven Process Thread-5] > o.a.n.c.t.ContinuallyRunProcessorTask > java.lang.NullPointerException: null > at java.lang.String.<init>(String.java:566) ~[na:1.8.0_65] > at > org.apache.nifi.processors.kafka.SplittableMessageContext.getKeyBytesAsString(SplittableMessageContext.java:105) > ~[na:na] > at > org.apache.nifi.processors.kafka.PutKafka.buildFailedFlowFileAttributes(PutKafka.java:395) > ~[na:na] > at > org.apache.nifi.processors.kafka.PutKafka.onTrigger(PutKafka.java:308) > ~[na:na] > at > org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) > ~[nifi-api-0.6.0.jar:0.6.0] > at > org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1057) > ~[nifi-framework-core-0.6.0.jar:0.6.0] > at > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) > [nifi-framework-core-0.6.0.jar:0.6.0] > at > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) > [nifi-framework-core-0.6.0.jar:0.6.0] > at > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123) > [nifi-framework-core-0.6.0.jar:0.6.0] > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > [na:1.8.0_65] > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > [na:1.8.0_65] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > [na:1.8.0_65] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > [na:1.8.0_65] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > [na:1.8.0_65] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > [na:1.8.0_65] > at java.lang.Thread.run(Thread.java:745) [na:1.8.0_65] > {quote} > But, GetKafka had no errors, stopped functioning, and became unresponsive to > attempts to stop it. The 30 sec invoke quietly mechanism didn't seem to > address the issue either presumably because the thread is stuck on some > object monitor. I tried to stop it so I could restart it but in attempting > to start it NiFi blocked me saying it was in STOPPING state. So stack dump > taken and this appears relevant: > {quote} > "StandardProcessScheduler Thread-8" Id=142 BLOCKED on > java.lang.Object@17820e22 > at > kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:295) > at > kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:111) > at > org.apache.nifi.processors.kafka.GetKafka.shutdownConsumer(GetKafka.java:296) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:137) > at > org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:125) > at > org.apache.nifi.util.ReflectionUtils.quietlyInvokeMethodsWithAnnotations(ReflectionUtils.java:233) > at > org.apache.nifi.util.ReflectionUtils.quietlyInvokeMethodsWithAnnotation(ReflectionUtils.java:85) > at > org.apache.nifi.controller.StandardProcessorNode$2.run(StandardProcessorNode.java:1330) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Number of Locked Synchronizers: 1 > - java.util.concurrent.ThreadPoolExecutor$Worker@1f4d2630 > {quote} > Then here i see the reference to that object in another stack: > {quote} > "kafka-consumer-scheduler-0" Id=213 TIMED_WAITING on > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@5e93adf > at sun.misc.Unsafe.park(Native Method) > at > java.util.concurrent.locks.LockSupport.parkUntil(LockSupport.java:256) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUntil(AbstractQueuedSynchronizer.java:2120) > at org.I0Itec.zkclient.ZkClient.waitForKeeperState(ZkClient.java:636) > at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:619) > at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:615) > at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:679) > at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813) > at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808) > at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777) > at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:326) > at > kafka.consumer.ZookeeperConsumerConnector.commitOffsetToZooKeeper(ZookeeperConsumerConnector.scala:283) > at > kafka.consumer.ZookeeperConsumerConnector$$anonfun$5.apply(ZookeeperConsumerConnector.scala:304) > at > kafka.consumer.ZookeeperConsumerConnector$$anonfun$5.apply(ZookeeperConsumerConnector.scala:303) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) > at > kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:303) > - waiting on java.lang.Object@17820e22 > at > kafka.consumer.ZookeeperConsumerConnector.autoCommit(ZookeeperConsumerConnector.scala:271) > at > kafka.consumer.ZookeeperConsumerConnector$$anonfun$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:134) > at > kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:99) > at kafka.utils.Utils$$anon$1.run(Utils.scala:54) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Number of Locked Synchronizers: 1 > - java.util.concurrent.ThreadPoolExecutor$Worker@4a713b18 > {quote} > Which is waiting on > {quote} > "9e31cdb4-9685-4b04-9164-1d737edd5f31_52.90.60.74-1458794505915-6302e1bb-leader-finder-thread" > Id=233 TIMED_WAITING on java.util.concurrent.locks.AbstractQueuedSyn > chronizer$ConditionObject@5e93adf > at sun.misc.Unsafe.park(Native Method) > at > java.util.concurrent.locks.LockSupport.parkUntil(LockSupport.java:256) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUntil(AbstractQueuedSynchronizer.java:2120) > at org.I0Itec.zkclient.ZkClient.waitForKeeperState(ZkClient.java:636) > at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:619) > at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:615) > at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:679) > at org.I0Itec.zkclient.ZkClient.getChildren(ZkClient.java:413) > at org.I0Itec.zkclient.ZkClient.getChildren(ZkClient.java:409) > at > kafka.utils.ZkUtils$.getChildrenParentMayNotExist(ZkUtils.scala:469) > at kafka.utils.ZkUtils$.getAllBrokersInCluster(ZkUtils.scala:81) > at > kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:65) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) > Number of Locked Synchronizers: 1 > - java.util.concurrent.locks.ReentrantLock$NonfairSync@50bc97c7 > {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332)