[ 
https://issues.apache.org/jira/browse/NIFI-1684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15210733#comment-15210733
 ] 

Oleg Zhurakousky commented on NIFI-1684:
----------------------------------------

One other issue
{code}
java.lang.NullPointerException: null
at 
org.apache.nifi.processors.kafka.PutKafka.buildMessageContext(PutKafka.java:411)
 ~[na:na]
at org.apache.nifi.processors.kafka.PutKafka.onTrigger(PutKafka.java:293) 
~[na:na]
at 
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
 ~[nifi-api-0.5.1.1.1.2.1-37.jar:0.5.1.1.1.2.1-37]
. . . 
{code}

> When the Kafka broker and Zookeeper instance used are replaced behavior is 
> poor
> -------------------------------------------------------------------------------
>
>                 Key: NIFI-1684
>                 URL: https://issues.apache.org/jira/browse/NIFI-1684
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Extensions
>    Affects Versions: 0.6.0
>            Reporter: Joseph Witt
>            Assignee: Oleg Zhurakousky
>             Fix For: 0.7.0
>
>
> The Kafka broker and zookeeper instance being used were replaced while NiFi 
> was running.
> During that time PutKafka had NullPointerExceptions such as:
> {quote}
> 2016-03-24 05:12:58,427 WARN [Timer-Driven Process Thread-5] 
> o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding 
> PutKafka[id=f8b2f669-fec5-3b26-ad2b-bca
> dff0c6543] due to uncaught Exception: java.lang.NullPointerException
> 2016-03-24 05:12:58,429 WARN [Timer-Driven Process Thread-5] 
> o.a.n.c.t.ContinuallyRunProcessorTask
> java.lang.NullPointerException: null
>         at java.lang.String.<init>(String.java:566) ~[na:1.8.0_65]
>         at 
> org.apache.nifi.processors.kafka.SplittableMessageContext.getKeyBytesAsString(SplittableMessageContext.java:105)
>  ~[na:na]
>         at 
> org.apache.nifi.processors.kafka.PutKafka.buildFailedFlowFileAttributes(PutKafka.java:395)
>  ~[na:na]
>         at 
> org.apache.nifi.processors.kafka.PutKafka.onTrigger(PutKafka.java:308) 
> ~[na:na]
>         at 
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
>  ~[nifi-api-0.6.0.jar:0.6.0]
>         at 
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1057)
>  ~[nifi-framework-core-0.6.0.jar:0.6.0]
>         at 
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
>  [nifi-framework-core-0.6.0.jar:0.6.0]
>         at 
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
>  [nifi-framework-core-0.6.0.jar:0.6.0]
>         at 
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123)
>  [nifi-framework-core-0.6.0.jar:0.6.0]
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> [na:1.8.0_65]
>         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) 
> [na:1.8.0_65]
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>  [na:1.8.0_65]
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>  [na:1.8.0_65]
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  [na:1.8.0_65]
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  [na:1.8.0_65]
>         at java.lang.Thread.run(Thread.java:745) [na:1.8.0_65]
> {quote}
> But, GetKafka had no errors, stopped functioning, and became unresponsive to 
> attempts to stop it.  The 30 sec invoke quietly mechanism didn't seem to 
> address the issue either presumably because the thread is stuck on some 
> object monitor.  I tried to stop it so I could restart it but in attempting 
> to start it NiFi blocked me saying it was in STOPPING state.  So stack dump 
> taken and this appears relevant:
> {quote}
> "StandardProcessScheduler Thread-8" Id=142 BLOCKED  on 
> java.lang.Object@17820e22
>         at 
> kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:295)
>         at 
> kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:111)
>         at 
> org.apache.nifi.processors.kafka.GetKafka.shutdownConsumer(GetKafka.java:296)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:497)
>         at 
> org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:137)
>         at 
> org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:125)
>         at 
> org.apache.nifi.util.ReflectionUtils.quietlyInvokeMethodsWithAnnotations(ReflectionUtils.java:233)
>         at 
> org.apache.nifi.util.ReflectionUtils.quietlyInvokeMethodsWithAnnotation(ReflectionUtils.java:85)
>         at 
> org.apache.nifi.controller.StandardProcessorNode$2.run(StandardProcessorNode.java:1330)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
>         Number of Locked Synchronizers: 1
>         - java.util.concurrent.ThreadPoolExecutor$Worker@1f4d2630
> {quote}
> Then here i see the reference to that object in another stack:
> {quote}
> "kafka-consumer-scheduler-0" Id=213 TIMED_WAITING  on 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@5e93adf
>         at sun.misc.Unsafe.park(Native Method)
>         at 
> java.util.concurrent.locks.LockSupport.parkUntil(LockSupport.java:256)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUntil(AbstractQueuedSynchronizer.java:2120)
>         at org.I0Itec.zkclient.ZkClient.waitForKeeperState(ZkClient.java:636)
>         at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:619)
>         at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:615)
>         at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:679)
>         at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
>         at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
>         at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
>         at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:326)
>         at 
> kafka.consumer.ZookeeperConsumerConnector.commitOffsetToZooKeeper(ZookeeperConsumerConnector.scala:283)
>         at 
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$5.apply(ZookeeperConsumerConnector.scala:304)
>         at 
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$5.apply(ZookeeperConsumerConnector.scala:303)
>         at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
>         at 
> kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:303)
>         - waiting on java.lang.Object@17820e22
>         at 
> kafka.consumer.ZookeeperConsumerConnector.autoCommit(ZookeeperConsumerConnector.scala:271)
>         at 
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:134)
>         at 
> kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:99)
>         at kafka.utils.Utils$$anon$1.run(Utils.scala:54)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
>         Number of Locked Synchronizers: 1
>         - java.util.concurrent.ThreadPoolExecutor$Worker@4a713b18
> {quote}
> Which is waiting on 
> {quote}
> "9e31cdb4-9685-4b04-9164-1d737edd5f31_52.90.60.74-1458794505915-6302e1bb-leader-finder-thread"
>  Id=233 TIMED_WAITING  on java.util.concurrent.locks.AbstractQueuedSyn
> chronizer$ConditionObject@5e93adf
>         at sun.misc.Unsafe.park(Native Method)
>         at 
> java.util.concurrent.locks.LockSupport.parkUntil(LockSupport.java:256)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUntil(AbstractQueuedSynchronizer.java:2120)
>         at org.I0Itec.zkclient.ZkClient.waitForKeeperState(ZkClient.java:636)
>         at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:619)
>         at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:615)
>         at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:679)
>         at org.I0Itec.zkclient.ZkClient.getChildren(ZkClient.java:413)
>         at org.I0Itec.zkclient.ZkClient.getChildren(ZkClient.java:409)
>         at 
> kafka.utils.ZkUtils$.getChildrenParentMayNotExist(ZkUtils.scala:469)
>         at kafka.utils.ZkUtils$.getAllBrokersInCluster(ZkUtils.scala:81)
>         at 
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:65)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
>         Number of Locked Synchronizers: 1
>         - java.util.concurrent.locks.ReentrantLock$NonfairSync@50bc97c7
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to