[ https://issues.apache.org/jira/browse/ARTEMIS-2642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Mike Youngstrom updated ARTEMIS-2642: ------------------------------------- Description: Using the Qpid JMS AMQP client attempting to receive messages with no wait with a selector can produce very long drain times causing a Drain Timeout on the client. Test Case using Qpid AMQP JMS client {code:java} public static void main(String[] args) throws Exception { final String queueName = "queue"; var connectionFactory = new JmsConnectionFactory( "tqadmin", "admin", "amqp://localhost:5672?jms.prefetchPolicy.all=1&jms.connectTimeout=60000&transport.trustAll=true&transport.verifyHost=false&amqp.drainTimeout=10000"); connectionFactory.setExceptionListener( e -> { System.out.println("Got a JMSException. Terminating the VM."); e.printStackTrace(); Runtime.getRuntime().halt(100); }); var sendCount = new LongAdder(); var consumeCount = new LongAdder(); var consumerThread = new Thread( () -> { try (var listenerContext = connectionFactory.createContext(Session.AUTO_ACKNOWLEDGE)) { try (var consumer = listenerContext.createConsumer( listenerContext.createQueue(queueName), "selector='dude'")) { while (!Thread.interrupted()) { while (consumer.receiveNoWait() != null) { consumeCount.increment(); long consumed = consumeCount.sum(); if (consumed % 100 == 0) { System.out.println("Messages Consumed: " + consumed); } } } } } }); consumerThread.start(); try (var context = connectionFactory.createContext(Session.AUTO_ACKNOWLEDGE)) { final Message message = context.createMessage(); message.setStringProperty("selector", "dude"); var producer = context.createProducer(); var queue = context.createQueue(queueName); while (sendCount.sum() < 100000 && !Thread.interrupted()) { producer.send(queue, message); sendCount.increment(); long sent = sendCount.sum(); if (sent % 100 == 0) { System.out.println("Messages Sent: " + sent); } } } } {code} Error Thrown after about 2000 messages are consumed (in a default local environment) {code} Exception in thread "Thread-0" javax.jms.JMSRuntimeException: Remote did not respond to a drain request in time at org.apache.qpid.jms.exceptions.JmsExceptionSupport.createRuntimeException(JmsExceptionSupport.java:211) at org.apache.qpid.jms.JmsConsumer.receiveNoWait(JmsConsumer.java:100) at connections.TQTest2.lambda$1(TQTest2.java:33) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.apache.qpid.jms.JmsOperationTimedOutException: Remote did not respond to a drain request in time at org.apache.qpid.jms.provider.exceptions.ProviderOperationTimedOutException.toJMSException(ProviderOperationTimedOutException.java:39) at org.apache.qpid.jms.provider.exceptions.ProviderOperationTimedOutException.toJMSException(ProviderOperationTimedOutException.java:1) at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:80) at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:112) at org.apache.qpid.jms.JmsConnection.pull(JmsConnection.java:915) at org.apache.qpid.jms.JmsConnection.pull(JmsConnection.java:899) at org.apache.qpid.jms.JmsMessageConsumer.performPullIfRequired(JmsMessageConsumer.java:726) at org.apache.qpid.jms.JmsMessageConsumer.dequeue(JmsMessageConsumer.java:332) at org.apache.qpid.jms.JmsMessageConsumer.receiveNoWait(JmsMessageConsumer.java:221) at org.apache.qpid.jms.JmsConsumer.receiveNoWait(JmsConsumer.java:98) ... 2 more Caused by: org.apache.qpid.jms.provider.exceptions.ProviderOperationTimedOutException: Remote did not respond to a drain request in time at org.apache.qpid.jms.provider.amqp.AmqpConsumer.lambda$1(AmqpConsumer.java:179) at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38) at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:127) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:515) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ... 1 more {code} was: Using the Qpid JMS AMQP client attempting to receive messages with no wait with a selector can produce very long drain times causing a Drain Timeout on the client. Test Case using Qpid AMQP JMS client {code:java} public static void main(String[] args) throws Exception { final String queueName = "queue"; var connectionFactory = new JmsConnectionFactory( "tqadmin", "admin", "amqp://localhost:5672?jms.prefetchPolicy.all=1&jms.connectTimeout=60000&transport.trustAll=true&transport.verifyHost=false&amqp.drainTimeout=10000"); connectionFactory.setExceptionListener( e -> { System.out.println("Got a JMSException. Terminating the VM."); e.printStackTrace(); Runtime.getRuntime().halt(100); }); var sendCount = new LongAdder(); var consumeCount = new LongAdder(); var consumerThread = new Thread( () -> { try (var listenerContext = connectionFactory.createContext(Session.AUTO_ACKNOWLEDGE)) { try (var consumer = listenerContext.createConsumer( listenerContext.createQueue(queueName), "selector='dude'")) { while (!Thread.interrupted()) { while (consumer.receiveNoWait() != null) { consumeCount.increment(); long consumed = consumeCount.sum(); if (consumed % 100 == 0) { System.out.println("Messages Consumed: " + consumed); } } } } } }); consumerThread.start(); try (var context = connectionFactory.createContext(Session.AUTO_ACKNOWLEDGE)) { final Message message = context.createMessage(); message.setStringProperty("selector", "dude"); var producer = context.createProducer(); var queue = context.createQueue(queueName); while (sendCount.sum() < 100000 && !Thread.interrupted()) { producer.send(queue, message); sendCount.increment(); long sent = sendCount.sum(); if (sent % 100 == 0) { System.out.println("Messages Sent: " + sent); } } } } {code} Error Thrown after about 2000 messages are consumed (in a default local environment) {code:java} Exception in thread "Thread-0" javax.jms.JMSRuntimeException: Remote did not respond to a drain request in timeException in thread "Thread-0" javax.jms.JMSRuntimeException: Remote did not respond to a drain request in time at org.apache.qpid.jms.exceptions.JmsExceptionSupport.createRuntimeException(JmsExceptionSupport.java:211) at org.apache.qpid.jms.JmsConsumer.receiveNoWait(JmsConsumer.java:100) at connections.TQTest2.lambda$1(TQTest2.java:33) at java.base/java.lang.Thread.run(Thread.java:834)Caused by: org.apache.qpid.jms.JmsOperationTimedOutException: Remote did not respond to a drain request in time at org.apache.qpid.jms.provider.exceptions.ProviderOperationTimedOutException.toJMSException(ProviderOperationTimedOutException.java:39) at org.apache.qpid.jms.provider.exceptions.ProviderOperationTimedOutException.toJMSException(ProviderOperationTimedOutException.java:1) at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:80) at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:112) at org.apache.qpid.jms.JmsConnection.pull(JmsConnection.java:915) at org.apache.qpid.jms.JmsConnection.pull(JmsConnection.java:899) at org.apache.qpid.jms.JmsMessageConsumer.performPullIfRequired(JmsMessageConsumer.java:726) at org.apache.qpid.jms.JmsMessageConsumer.dequeue(JmsMessageConsumer.java:332) at org.apache.qpid.jms.JmsMessageConsumer.receiveNoWait(JmsMessageConsumer.java:221) at org.apache.qpid.jms.JmsConsumer.receiveNoWait(JmsConsumer.java:98) ... 2 moreCaused by: org.apache.qpid.jms.provider.exceptions.ProviderOperationTimedOutException: Remote did not respond to a drain request in time at org.apache.qpid.jms.provider.amqp.AmqpConsumer.lambda$1(AmqpConsumer.java:179) at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38) at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:127) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:515) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ... 1 more {code} > Drain requests with a selector can cause Drain Timeouts > ------------------------------------------------------- > > Key: ARTEMIS-2642 > URL: https://issues.apache.org/jira/browse/ARTEMIS-2642 > Project: ActiveMQ Artemis > Issue Type: Bug > Components: Broker > Affects Versions: 2.11.0 > Reporter: Mike Youngstrom > Priority: Major > > Using the Qpid JMS AMQP client attempting to receive messages with no wait > with a selector can produce very long drain times causing a Drain Timeout on > the client. > Test Case using Qpid AMQP JMS client > {code:java} > public static void main(String[] args) throws Exception { > final String queueName = "queue"; > var connectionFactory = > new JmsConnectionFactory( > "tqadmin", > "admin", > > "amqp://localhost:5672?jms.prefetchPolicy.all=1&jms.connectTimeout=60000&transport.trustAll=true&transport.verifyHost=false&amqp.drainTimeout=10000"); > connectionFactory.setExceptionListener( > e -> { > System.out.println("Got a JMSException. Terminating the VM."); > e.printStackTrace(); > Runtime.getRuntime().halt(100); > }); > var sendCount = new LongAdder(); > var consumeCount = new LongAdder(); > var consumerThread = > new Thread( > () -> { > try (var listenerContext = > connectionFactory.createContext(Session.AUTO_ACKNOWLEDGE)) { > try (var consumer = > listenerContext.createConsumer( > listenerContext.createQueue(queueName), > "selector='dude'")) { > while (!Thread.interrupted()) { > while (consumer.receiveNoWait() != null) { > consumeCount.increment(); > long consumed = consumeCount.sum(); > if (consumed % 100 == 0) { > System.out.println("Messages Consumed: " + consumed); > } > } > } > } > } > }); > consumerThread.start(); > try (var context = > connectionFactory.createContext(Session.AUTO_ACKNOWLEDGE)) { > final Message message = context.createMessage(); > message.setStringProperty("selector", "dude"); > var producer = context.createProducer(); > var queue = context.createQueue(queueName); > while (sendCount.sum() < 100000 && !Thread.interrupted()) { > producer.send(queue, message); > sendCount.increment(); > long sent = sendCount.sum(); > if (sent % 100 == 0) { > System.out.println("Messages Sent: " + sent); > } > } > } > } > {code} > Error Thrown after about 2000 messages are consumed (in a default local > environment) > {code} > Exception in thread "Thread-0" javax.jms.JMSRuntimeException: Remote did not > respond to a drain request in time > at > org.apache.qpid.jms.exceptions.JmsExceptionSupport.createRuntimeException(JmsExceptionSupport.java:211) > at org.apache.qpid.jms.JmsConsumer.receiveNoWait(JmsConsumer.java:100) > at connections.TQTest2.lambda$1(TQTest2.java:33) > at java.base/java.lang.Thread.run(Thread.java:834) > Caused by: org.apache.qpid.jms.JmsOperationTimedOutException: Remote did not > respond to a drain request in time > at > org.apache.qpid.jms.provider.exceptions.ProviderOperationTimedOutException.toJMSException(ProviderOperationTimedOutException.java:39) > at > org.apache.qpid.jms.provider.exceptions.ProviderOperationTimedOutException.toJMSException(ProviderOperationTimedOutException.java:1) > at > org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:80) > at > org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:112) > at org.apache.qpid.jms.JmsConnection.pull(JmsConnection.java:915) > at org.apache.qpid.jms.JmsConnection.pull(JmsConnection.java:899) > at > org.apache.qpid.jms.JmsMessageConsumer.performPullIfRequired(JmsMessageConsumer.java:726) > at > org.apache.qpid.jms.JmsMessageConsumer.dequeue(JmsMessageConsumer.java:332) > at > org.apache.qpid.jms.JmsMessageConsumer.receiveNoWait(JmsMessageConsumer.java:221) > at org.apache.qpid.jms.JmsConsumer.receiveNoWait(JmsConsumer.java:98) > ... 2 more > Caused by: > org.apache.qpid.jms.provider.exceptions.ProviderOperationTimedOutException: > Remote did not respond to a drain request in time > at > org.apache.qpid.jms.provider.amqp.AmqpConsumer.lambda$1(AmqpConsumer.java:179) > at > io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38) > at > io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:127) > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:515) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918) > at > io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) > ... 1 more > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)