[ 
https://issues.apache.org/jira/browse/NIFI-1684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15210111#comment-15210111
 ] 

ASF GitHub Bot commented on NIFI-1684:
--------------------------------------

GitHub user olegz opened a pull request:

    https://github.com/apache/nifi/pull/302

    NIFI-1684 fixed NPE, added tests

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/olegz/nifi NIFI-1684

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/302.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #302
    
----
commit 409ad1051fc44a0ce8acf1a59ba8dac5261a688e
Author: Oleg Zhurakousky <o...@suitcase.io>
Date:   2016-03-24T11:37:58Z

    NIFI-1684 fixed NPE, added tests

----


> When the Kafka broker and Zookeeper instance used are replaced behavior is 
> poor
> -------------------------------------------------------------------------------
>
>                 Key: NIFI-1684
>                 URL: https://issues.apache.org/jira/browse/NIFI-1684
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Extensions
>    Affects Versions: 0.6.0
>            Reporter: Joseph Witt
>            Assignee: Oleg Zhurakousky
>             Fix For: 0.7.0
>
>
> The Kafka broker and zookeeper instance being used were replaced while NiFi 
> was running.
> During that time PutKafka had NullPointerExceptions such as:
> {quote}
> 2016-03-24 05:12:58,427 WARN [Timer-Driven Process Thread-5] 
> o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding 
> PutKafka[id=f8b2f669-fec5-3b26-ad2b-bca
> dff0c6543] due to uncaught Exception: java.lang.NullPointerException
> 2016-03-24 05:12:58,429 WARN [Timer-Driven Process Thread-5] 
> o.a.n.c.t.ContinuallyRunProcessorTask
> java.lang.NullPointerException: null
>         at java.lang.String.<init>(String.java:566) ~[na:1.8.0_65]
>         at 
> org.apache.nifi.processors.kafka.SplittableMessageContext.getKeyBytesAsString(SplittableMessageContext.java:105)
>  ~[na:na]
>         at 
> org.apache.nifi.processors.kafka.PutKafka.buildFailedFlowFileAttributes(PutKafka.java:395)
>  ~[na:na]
>         at 
> org.apache.nifi.processors.kafka.PutKafka.onTrigger(PutKafka.java:308) 
> ~[na:na]
>         at 
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
>  ~[nifi-api-0.6.0.jar:0.6.0]
>         at 
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1057)
>  ~[nifi-framework-core-0.6.0.jar:0.6.0]
>         at 
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
>  [nifi-framework-core-0.6.0.jar:0.6.0]
>         at 
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
>  [nifi-framework-core-0.6.0.jar:0.6.0]
>         at 
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123)
>  [nifi-framework-core-0.6.0.jar:0.6.0]
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> [na:1.8.0_65]
>         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) 
> [na:1.8.0_65]
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>  [na:1.8.0_65]
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>  [na:1.8.0_65]
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  [na:1.8.0_65]
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  [na:1.8.0_65]
>         at java.lang.Thread.run(Thread.java:745) [na:1.8.0_65]
> {quote}
> But, GetKafka had no errors, stopped functioning, and became unresponsive to 
> attempts to stop it.  The 30 sec invoke quietly mechanism didn't seem to 
> address the issue either presumably because the thread is stuck on some 
> object monitor.  I tried to stop it so I could restart it but in attempting 
> to start it NiFi blocked me saying it was in STOPPING state.  So stack dump 
> taken and this appears relevant:
> {quote}
> "StandardProcessScheduler Thread-8" Id=142 BLOCKED  on 
> java.lang.Object@17820e22
>         at 
> kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:295)
>         at 
> kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:111)
>         at 
> org.apache.nifi.processors.kafka.GetKafka.shutdownConsumer(GetKafka.java:296)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:497)
>         at 
> org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:137)
>         at 
> org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:125)
>         at 
> org.apache.nifi.util.ReflectionUtils.quietlyInvokeMethodsWithAnnotations(ReflectionUtils.java:233)
>         at 
> org.apache.nifi.util.ReflectionUtils.quietlyInvokeMethodsWithAnnotation(ReflectionUtils.java:85)
>         at 
> org.apache.nifi.controller.StandardProcessorNode$2.run(StandardProcessorNode.java:1330)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
>         Number of Locked Synchronizers: 1
>         - java.util.concurrent.ThreadPoolExecutor$Worker@1f4d2630
> {quote}
> Then here i see the reference to that object in another stack:
> {quote}
> "kafka-consumer-scheduler-0" Id=213 TIMED_WAITING  on 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@5e93adf
>         at sun.misc.Unsafe.park(Native Method)
>         at 
> java.util.concurrent.locks.LockSupport.parkUntil(LockSupport.java:256)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUntil(AbstractQueuedSynchronizer.java:2120)
>         at org.I0Itec.zkclient.ZkClient.waitForKeeperState(ZkClient.java:636)
>         at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:619)
>         at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:615)
>         at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:679)
>         at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)
>         at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)
>         at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
>         at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:326)
>         at 
> kafka.consumer.ZookeeperConsumerConnector.commitOffsetToZooKeeper(ZookeeperConsumerConnector.scala:283)
>         at 
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$5.apply(ZookeeperConsumerConnector.scala:304)
>         at 
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$5.apply(ZookeeperConsumerConnector.scala:303)
>         at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
>         at 
> kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:303)
>         - waiting on java.lang.Object@17820e22
>         at 
> kafka.consumer.ZookeeperConsumerConnector.autoCommit(ZookeeperConsumerConnector.scala:271)
>         at 
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:134)
>         at 
> kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:99)
>         at kafka.utils.Utils$$anon$1.run(Utils.scala:54)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
>         Number of Locked Synchronizers: 1
>         - java.util.concurrent.ThreadPoolExecutor$Worker@4a713b18
> {quote}
> Which is waiting on 
> {quote}
> "9e31cdb4-9685-4b04-9164-1d737edd5f31_52.90.60.74-1458794505915-6302e1bb-leader-finder-thread"
>  Id=233 TIMED_WAITING  on java.util.concurrent.locks.AbstractQueuedSyn
> chronizer$ConditionObject@5e93adf
>         at sun.misc.Unsafe.park(Native Method)
>         at 
> java.util.concurrent.locks.LockSupport.parkUntil(LockSupport.java:256)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUntil(AbstractQueuedSynchronizer.java:2120)
>         at org.I0Itec.zkclient.ZkClient.waitForKeeperState(ZkClient.java:636)
>         at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:619)
>         at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:615)
>         at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:679)
>         at org.I0Itec.zkclient.ZkClient.getChildren(ZkClient.java:413)
>         at org.I0Itec.zkclient.ZkClient.getChildren(ZkClient.java:409)
>         at 
> kafka.utils.ZkUtils$.getChildrenParentMayNotExist(ZkUtils.scala:469)
>         at kafka.utils.ZkUtils$.getAllBrokersInCluster(ZkUtils.scala:81)
>         at 
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:65)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
>         Number of Locked Synchronizers: 1
>         - java.util.concurrent.locks.ReentrantLock$NonfairSync@50bc97c7
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to