[ 
https://issues.apache.org/jira/browse/ARTEMIS-2642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mike Youngstrom updated ARTEMIS-2642:
-------------------------------------
    Description: 
Using the Qpid JMS AMQP client attempting to receive messages with no wait with 
a selector can produce very long drain times causing a Drain Timeout on the 
client.  If I change to using "receive()" (causing the qpid client to no longer 
send drain requests) the problem goes away.

Test Case using Qpid AMQP JMS client
{code:java}
   public static void main(String[] args) throws Exception {
    final String queueName = "queue";
    var connectionFactory =
        new JmsConnectionFactory(
            "tqadmin",
            "admin",
            
"amqp://localhost:5672?jms.prefetchPolicy.all=1&jms.connectTimeout=60000&amqp.drainTimeout=10000");
    connectionFactory.setExceptionListener(
        e -> {
          System.out.println("Got a JMSException.  Terminating the VM.");
          e.printStackTrace();
          Runtime.getRuntime().halt(100);
        });
    var sendCount = new LongAdder();
    var consumeCount = new LongAdder();
    var consumerThread =
        new Thread(
            () -> {
              try (var listenerContext =
                  connectionFactory.createContext(Session.AUTO_ACKNOWLEDGE)) {
                try (var consumer =
                    listenerContext.createConsumer(
                        listenerContext.createQueue(queueName), 
"selector='dude'")) {
                  while (!Thread.interrupted()) {
                    while (consumer.receiveNoWait() != null) {
                      consumeCount.increment();
                      long consumed = consumeCount.sum();
                      if (consumed % 100 == 0) {
                        System.out.println("Messages Consumed: " + consumed);
                      }
                    }
                  }
                }
              }
            });
    consumerThread.start();
    try (var context = 
connectionFactory.createContext(Session.AUTO_ACKNOWLEDGE)) {
      final Message message = context.createMessage();
      message.setStringProperty("selector", "dude");
      var producer = context.createProducer();
      var queue = context.createQueue(queueName);
      while (sendCount.sum() < 100000 && !Thread.interrupted()) {
        producer.send(queue, message);
        sendCount.increment();
        long sent = sendCount.sum();
        if (sent % 100 == 0) {
          System.out.println("Messages Sent: " + sent);
        }
      }
    }
  }

{code}
Error Thrown after about 2000 messages are consumed (in a default local 
environment)
{code:java}
Exception in thread "Thread-0" javax.jms.JMSRuntimeException: Remote did not 
respond to a drain request in time
        at 
org.apache.qpid.jms.exceptions.JmsExceptionSupport.createRuntimeException(JmsExceptionSupport.java:211)
        at org.apache.qpid.jms.JmsConsumer.receiveNoWait(JmsConsumer.java:100)
        at connections.TQTest2.lambda$1(TQTest2.java:33)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.qpid.jms.JmsOperationTimedOutException: Remote did not 
respond to a drain request in time
        at 
org.apache.qpid.jms.provider.exceptions.ProviderOperationTimedOutException.toJMSException(ProviderOperationTimedOutException.java:39)
        at 
org.apache.qpid.jms.provider.exceptions.ProviderOperationTimedOutException.toJMSException(ProviderOperationTimedOutException.java:1)
        at 
org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:80)
        at 
org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:112)
        at org.apache.qpid.jms.JmsConnection.pull(JmsConnection.java:915)
        at org.apache.qpid.jms.JmsConnection.pull(JmsConnection.java:899)
        at 
org.apache.qpid.jms.JmsMessageConsumer.performPullIfRequired(JmsMessageConsumer.java:726)
        at 
org.apache.qpid.jms.JmsMessageConsumer.dequeue(JmsMessageConsumer.java:332)
        at 
org.apache.qpid.jms.JmsMessageConsumer.receiveNoWait(JmsMessageConsumer.java:221)
        at org.apache.qpid.jms.JmsConsumer.receiveNoWait(JmsConsumer.java:98)
        ... 2 more
Caused by: 
org.apache.qpid.jms.provider.exceptions.ProviderOperationTimedOutException: 
Remote did not respond to a drain request in time
        at 
org.apache.qpid.jms.provider.amqp.AmqpConsumer.lambda$1(AmqpConsumer.java:179)
        at 
io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
        at 
io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:127)
        at 
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:515)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
        at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        ... 1 more
{code}

  was:
Using the Qpid JMS AMQP client attempting to receive messages with no wait with 
a selector can produce very long drain times causing a Drain Timeout on the 
client.  If I change to using "receive()" (causing the qpid client to no longer 
send drain requests) the problem goes away

Test Case using Qpid AMQP JMS client
{code:java}
   public static void main(String[] args) throws Exception {
    final String queueName = "queue";
    var connectionFactory =
        new JmsConnectionFactory(
            "tqadmin",
            "admin",
            
"amqp://localhost:5672?jms.prefetchPolicy.all=1&jms.connectTimeout=60000&amqp.drainTimeout=10000");
    connectionFactory.setExceptionListener(
        e -> {
          System.out.println("Got a JMSException.  Terminating the VM.");
          e.printStackTrace();
          Runtime.getRuntime().halt(100);
        });
    var sendCount = new LongAdder();
    var consumeCount = new LongAdder();
    var consumerThread =
        new Thread(
            () -> {
              try (var listenerContext =
                  connectionFactory.createContext(Session.AUTO_ACKNOWLEDGE)) {
                try (var consumer =
                    listenerContext.createConsumer(
                        listenerContext.createQueue(queueName), 
"selector='dude'")) {
                  while (!Thread.interrupted()) {
                    while (consumer.receiveNoWait() != null) {
                      consumeCount.increment();
                      long consumed = consumeCount.sum();
                      if (consumed % 100 == 0) {
                        System.out.println("Messages Consumed: " + consumed);
                      }
                    }
                  }
                }
              }
            });
    consumerThread.start();
    try (var context = 
connectionFactory.createContext(Session.AUTO_ACKNOWLEDGE)) {
      final Message message = context.createMessage();
      message.setStringProperty("selector", "dude");
      var producer = context.createProducer();
      var queue = context.createQueue(queueName);
      while (sendCount.sum() < 100000 && !Thread.interrupted()) {
        producer.send(queue, message);
        sendCount.increment();
        long sent = sendCount.sum();
        if (sent % 100 == 0) {
          System.out.println("Messages Sent: " + sent);
        }
      }
    }
  }

{code}
Error Thrown after about 2000 messages are consumed (in a default local 
environment)
{code:java}
Exception in thread "Thread-0" javax.jms.JMSRuntimeException: Remote did not 
respond to a drain request in time
        at 
org.apache.qpid.jms.exceptions.JmsExceptionSupport.createRuntimeException(JmsExceptionSupport.java:211)
        at org.apache.qpid.jms.JmsConsumer.receiveNoWait(JmsConsumer.java:100)
        at connections.TQTest2.lambda$1(TQTest2.java:33)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.qpid.jms.JmsOperationTimedOutException: Remote did not 
respond to a drain request in time
        at 
org.apache.qpid.jms.provider.exceptions.ProviderOperationTimedOutException.toJMSException(ProviderOperationTimedOutException.java:39)
        at 
org.apache.qpid.jms.provider.exceptions.ProviderOperationTimedOutException.toJMSException(ProviderOperationTimedOutException.java:1)
        at 
org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:80)
        at 
org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:112)
        at org.apache.qpid.jms.JmsConnection.pull(JmsConnection.java:915)
        at org.apache.qpid.jms.JmsConnection.pull(JmsConnection.java:899)
        at 
org.apache.qpid.jms.JmsMessageConsumer.performPullIfRequired(JmsMessageConsumer.java:726)
        at 
org.apache.qpid.jms.JmsMessageConsumer.dequeue(JmsMessageConsumer.java:332)
        at 
org.apache.qpid.jms.JmsMessageConsumer.receiveNoWait(JmsMessageConsumer.java:221)
        at org.apache.qpid.jms.JmsConsumer.receiveNoWait(JmsConsumer.java:98)
        ... 2 more
Caused by: 
org.apache.qpid.jms.provider.exceptions.ProviderOperationTimedOutException: 
Remote did not respond to a drain request in time
        at 
org.apache.qpid.jms.provider.amqp.AmqpConsumer.lambda$1(AmqpConsumer.java:179)
        at 
io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
        at 
io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:127)
        at 
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:515)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
        at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        ... 1 more
{code}


> Drain requests with a selector can cause Drain Timeouts
> -------------------------------------------------------
>
>                 Key: ARTEMIS-2642
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-2642
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 2.11.0
>            Reporter: Mike Youngstrom
>            Priority: Major
>
> Using the Qpid JMS AMQP client attempting to receive messages with no wait 
> with a selector can produce very long drain times causing a Drain Timeout on 
> the client.  If I change to using "receive()" (causing the qpid client to no 
> longer send drain requests) the problem goes away.
> Test Case using Qpid AMQP JMS client
> {code:java}
>    public static void main(String[] args) throws Exception {
>     final String queueName = "queue";
>     var connectionFactory =
>         new JmsConnectionFactory(
>             "tqadmin",
>             "admin",
>             
> "amqp://localhost:5672?jms.prefetchPolicy.all=1&jms.connectTimeout=60000&amqp.drainTimeout=10000");
>     connectionFactory.setExceptionListener(
>         e -> {
>           System.out.println("Got a JMSException.  Terminating the VM.");
>           e.printStackTrace();
>           Runtime.getRuntime().halt(100);
>         });
>     var sendCount = new LongAdder();
>     var consumeCount = new LongAdder();
>     var consumerThread =
>         new Thread(
>             () -> {
>               try (var listenerContext =
>                   connectionFactory.createContext(Session.AUTO_ACKNOWLEDGE)) {
>                 try (var consumer =
>                     listenerContext.createConsumer(
>                         listenerContext.createQueue(queueName), 
> "selector='dude'")) {
>                   while (!Thread.interrupted()) {
>                     while (consumer.receiveNoWait() != null) {
>                       consumeCount.increment();
>                       long consumed = consumeCount.sum();
>                       if (consumed % 100 == 0) {
>                         System.out.println("Messages Consumed: " + consumed);
>                       }
>                     }
>                   }
>                 }
>               }
>             });
>     consumerThread.start();
>     try (var context = 
> connectionFactory.createContext(Session.AUTO_ACKNOWLEDGE)) {
>       final Message message = context.createMessage();
>       message.setStringProperty("selector", "dude");
>       var producer = context.createProducer();
>       var queue = context.createQueue(queueName);
>       while (sendCount.sum() < 100000 && !Thread.interrupted()) {
>         producer.send(queue, message);
>         sendCount.increment();
>         long sent = sendCount.sum();
>         if (sent % 100 == 0) {
>           System.out.println("Messages Sent: " + sent);
>         }
>       }
>     }
>   }
> {code}
> Error Thrown after about 2000 messages are consumed (in a default local 
> environment)
> {code:java}
> Exception in thread "Thread-0" javax.jms.JMSRuntimeException: Remote did not 
> respond to a drain request in time
>       at 
> org.apache.qpid.jms.exceptions.JmsExceptionSupport.createRuntimeException(JmsExceptionSupport.java:211)
>       at org.apache.qpid.jms.JmsConsumer.receiveNoWait(JmsConsumer.java:100)
>       at connections.TQTest2.lambda$1(TQTest2.java:33)
>       at java.base/java.lang.Thread.run(Thread.java:834)
> Caused by: org.apache.qpid.jms.JmsOperationTimedOutException: Remote did not 
> respond to a drain request in time
>       at 
> org.apache.qpid.jms.provider.exceptions.ProviderOperationTimedOutException.toJMSException(ProviderOperationTimedOutException.java:39)
>       at 
> org.apache.qpid.jms.provider.exceptions.ProviderOperationTimedOutException.toJMSException(ProviderOperationTimedOutException.java:1)
>       at 
> org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:80)
>       at 
> org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:112)
>       at org.apache.qpid.jms.JmsConnection.pull(JmsConnection.java:915)
>       at org.apache.qpid.jms.JmsConnection.pull(JmsConnection.java:899)
>       at 
> org.apache.qpid.jms.JmsMessageConsumer.performPullIfRequired(JmsMessageConsumer.java:726)
>       at 
> org.apache.qpid.jms.JmsMessageConsumer.dequeue(JmsMessageConsumer.java:332)
>       at 
> org.apache.qpid.jms.JmsMessageConsumer.receiveNoWait(JmsMessageConsumer.java:221)
>       at org.apache.qpid.jms.JmsConsumer.receiveNoWait(JmsConsumer.java:98)
>       ... 2 more
> Caused by: 
> org.apache.qpid.jms.provider.exceptions.ProviderOperationTimedOutException: 
> Remote did not respond to a drain request in time
>       at 
> org.apache.qpid.jms.provider.amqp.AmqpConsumer.lambda$1(AmqpConsumer.java:179)
>       at 
> io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
>       at 
> io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:127)
>       at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
>       at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416)
>       at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:515)
>       at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
>       at 
> io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>       ... 1 more
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to