[ 
https://issues.apache.org/jira/browse/NIFI-10885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nandor Soma Abonyi reassigned NIFI-10885:
-----------------------------------------

    Assignee: Nandor Soma Abonyi

> ConsumeMQTT should stop client threads
> --------------------------------------
>
>                 Key: NIFI-10885
>                 URL: https://issues.apache.org/jira/browse/NIFI-10885
>             Project: Apache NiFi
>          Issue Type: Bug
>    Affects Versions: 1.18.0
>            Reporter: Peter Turcsanyi
>            Assignee: Nandor Soma Abonyi
>            Priority: Minor
>
> When {{ConsumeMQTT}} is stopped, some client threads keep running which 
> prevents NiFi to stop cleanly.
> {code:java}
> 2022-11-28 13:14:01,126 INFO [main] org.apache.nifi.bootstrap.Command NiFi 
> PID [13987] shutdown in progress...
> 2022-11-28 13:14:03,140 INFO [main] org.apache.nifi.bootstrap.Command NiFi 
> PID [13987] shutdown in progress...
> 2022-11-28 13:14:05,152 INFO [main] org.apache.nifi.bootstrap.Command NiFi 
> PID [13987] shutdown in progress...
> 2022-11-28 13:14:05,162 WARN [main] org.apache.nifi.bootstrap.Command NiFi 
> PID [13987] shutdown not completed after 20 seconds: Killing process{code}
> It affects the v5 client version and may be an issue in the underlying client 
> library.
> Suspicious threads:
> {code:java}
> "com.hivemq.client.mqtt-1-1@20668" prio=10 tid=0x69 nid=NA runnable
>   java.lang.Thread.State: RUNNABLE
>       at sun.nio.ch.KQueue.poll(KQueue.java:-1)
>       at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:122)
>       at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:129)
>       - locked <0x522a> (a sun.nio.ch.KQueueSelectorImpl)
>       - locked <0x522b> (a io.netty.channel.nio.SelectedSelectionKeySet)
>       at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:141)
>       at 
> io.netty.channel.nio.SelectedSelectionKeySetSelector.select(SelectedSelectionKeySetSelector.java:62)
>       at io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:883)
>       at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:526)
>       at 
> io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
>       at 
> io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>       at 
> io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
>       at 
> java.lang.Thread.run(Thread.java:833)"RxComputationThreadPool-1@20859" daemon 
> prio=5 tid=0x6a nid=NA waiting
>   java.lang.Thread.State: WAITING
>       at jdk.internal.misc.Unsafe.park(Unsafe.java:-1)
>       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:341)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionNode.block(AbstractQueuedSynchronizer.java:506)
>       at 
> java.util.concurrent.ForkJoinPool.unmanagedBlock(ForkJoinPool.java:3463)
>       at 
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3434)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1623)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1170)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:899)
>       at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1062)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1122)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>       at java.lang.Thread.run(Thread.java:833)"RxSchedulerPurge-1@20122" 
> daemon prio=5 tid=0x67 nid=NA waiting
>   java.lang.Thread.State: WAITING
>       at jdk.internal.misc.Unsafe.park(Unsafe.java:-1)
>       at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:252)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:1672)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1182)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:899)
>       at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1062)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1122)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>       at 
> java.lang.Thread.run(Thread.java:833)"RxCachedWorkerPoolEvictor-1@20139" 
> daemon prio=5 tid=0x68 nid=NA waiting
>   java.lang.Thread.State: WAITING
>       at jdk.internal.misc.Unsafe.park(Unsafe.java:-1)
>       at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:252)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:1672)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1182)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:899)
>       at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1062)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1122)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>       at java.lang.Thread.run(Thread.java:833) {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to