[ https://issues.apache.org/jira/browse/NIFI-6437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16883731#comment-16883731 ]
Piotr commented on NIFI-6437: ----------------------------- similar situation: problem with hbase/hdfs and ConsumerAMQP was creating more and more connection to rabbitmq. and the same rabbitmq "explode" {code:java} 2019-06-21 14:14:38,296 ERROR [Timer-Driven Process Thread-5] org.apache.nifi.hbase.PutHBaseRecord PutHBaseRecord[id=4d45e18a-0169-1000-ffff-ffffe0cc73f6] Failed to put records to HBase.: org.apache.hadoo p.hbase.client.RetriesExhaustedWithDetailsException: Failed 158 actions: ConnectionClosingException: 158 times, org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 158 actions: ConnectionClosingException: 158 times, at org.apache.hadoop.hbase.client.AsyncProcess$BatchErrors.makeException(AsyncProcess.java:228) at org.apache.hadoop.hbase.client.AsyncProcess$BatchErrors.access$1700(AsyncProcess.java:208) at org.apache.hadoop.hbase.client.AsyncProcess.waitForAllPreviousOpsAndReset(AsyncProcess.java:1689) at org.apache.hadoop.hbase.client.BufferedMutatorImpl.backgroundFlushCommits(BufferedMutatorImpl.java:208) at org.apache.hadoop.hbase.client.BufferedMutatorImpl.flush(BufferedMutatorImpl.java:183) at org.apache.hadoop.hbase.client.HTable.flushCommits(HTable.java:1449) at org.apache.hadoop.hbase.client.HTable.close(HTable.java:1485) at org.apache.nifi.hbase.HBase_1_1_2_ClientService.put(HBase_1_1_2_ClientService.java:421) at sun.reflect.GeneratedMethodAccessor204.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:84) at com.sun.proxy.$Proxy232.put(Unknown Source) at org.apache.nifi.hbase.PutHBaseRecord.addBatch(PutHBaseRecord.java:204) at org.apache.nifi.hbase.PutHBaseRecord.onTrigger(PutHBaseRecord.java:277) at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165) at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2019-06-21 14:14:38,336 ERROR [RabbitMQ Error On Write Thread] c.r.c.impl.ForgivingExceptionHandler An unexpected connection driver error occured java.net.SocketException: Broken pipe (Write failed) at java.net.SocketOutputStream.socketWrite0(Native Method) at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111) at java.net.SocketOutputStream.write(SocketOutputStream.java:155) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at java.io.DataOutputStream.flush(DataOutputStream.java:123) at com.rabbitmq.client.impl.SocketFrameHandler.flush(SocketFrameHandler.java:177) at com.rabbitmq.client.impl.AMQConnection.flush(AMQConnection.java:571) at com.rabbitmq.client.impl.AMQCommand.transmit(AMQCommand.java:134) at com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:447) at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:423) at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:416) at com.rabbitmq.client.impl.recovery.RecoveryAwareChannelN.basicAck(RecoveryAwareChannelN.java:89) at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicAck(AutorecoveringChannel.java:436) at org.apache.nifi.amqp.processors.AMQPConsumer.acknowledge(AMQPConsumer.java:104) at org.apache.nifi.amqp.processors.ConsumeAMQP.processResource(ConsumeAMQP.java:154) at org.apache.nifi.amqp.processors.ConsumeAMQP.processResource(ConsumeAMQP.java:47) at org.apache.nifi.amqp.processors.AbstractAMQPProcessor.onTrigger(AbstractAMQPProcessor.java:163) at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165) at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} > a lot of connections from ConsumerAMQP > -------------------------------------- > > Key: NIFI-6437 > URL: https://issues.apache.org/jira/browse/NIFI-6437 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework > Affects Versions: 1.9.2 > Reporter: Piotr > Priority: Critical > > hi, when I try to connect to not existing queue on rabbitmq via ConsumerAMQP, > nifi was creating more and more connection to rabbitmq. after few minutes > rabbitmq "explode". i saw this error in logs: > {code:java} > 2019-07-12 13:20:29,853 INFO [Timer-Driven Process Thread-3] > o.a.n.c.s.TimerDrivenSchedulingAgent Stopped scheduling > ConsumeAMQP[id=e5e84018-016b-1000-edd7-e0de56297dd9] to run > 2019-07-12 13:20:29,856 INFO [Timer-Driven Process Thread-1] > o.a.nifi.amqp.processors.AMQPConsumer Successfully connected AMQPConsumer to > amqp://guest@172.17.0.4:5672/ and 'test_queue' queue > 2019-07-12 13:20:29,860 ERROR [Timer-Driven Process Thread-1] > o.a.nifi.amqp.processors.ConsumeAMQP > ConsumeAMQP[id=e5e84018-016b-1000-edd7-e0de56297dd9] Failed to process > session due to org.apache.nifi.processor.exception.ProcessException: Failed > to connect to AMQP Broker: > org.apache.nifi.processor.exception.ProcessException: Failed to connect to > AMQP Broker > org.apache.nifi.processor.exception.ProcessException: Failed to connect to > AMQP Broker > at > org.apache.nifi.amqp.processors.ConsumeAMQP.createAMQPWorker(ConsumeAMQP.java:197) > at > org.apache.nifi.amqp.processors.ConsumeAMQP.createAMQPWorker(ConsumeAMQP.java:47) > at > org.apache.nifi.amqp.processors.AbstractAMQPProcessor.createResource(AbstractAMQPProcessor.java:202) > at > org.apache.nifi.amqp.processors.AbstractAMQPProcessor.onTrigger(AbstractAMQPProcessor.java:156) > at > org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) > at > org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1162) > at > org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:209) > at > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117) > at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.IOException: null > at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:126) > at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:122) > at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:1378) > at > com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicConsume(AutorecoveringChannel.java:540) > at > com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicConsume(AutorecoveringChannel.java:494) > at > com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicConsume(AutorecoveringChannel.java:472) > at > org.apache.nifi.amqp.processors.AMQPConsumer.<init>(AMQPConsumer.java:77) > at > org.apache.nifi.amqp.processors.ConsumeAMQP.createAMQPWorker(ConsumeAMQP.java:193) > ... 15 common frames omitted > Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; > protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND > - no queue 'test_queue' in vhost '/', class-id=60, method-id=20) > at > com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) > at > com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36) > at > com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:494) > at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:1372) > ... 20 common frames omitted > Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; > protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND > - no queue 'test_queue' in vhost '/', class-id=60, method-id=20) > at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:516) > at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:346) > at > com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:178) > at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:111) > at > com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:670) > at > com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:48) > at > com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:597) > ... 1 common frames omitted{code} -- This message was sent by Atlassian JIRA (v7.6.14#76016)