[ 
https://issues.apache.org/jira/browse/QPIDJMS-458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16872498#comment-16872498
 ] 

Christian Danner edited comment on QPIDJMS-458 at 6/25/19 4:27 PM:
-------------------------------------------------------------------

While we were unable to reproduce the problem outlined above we are now 
experiencing a similar issue in our production environment when attempting to 
receive messages from a queue to which all messages from external communication 
partners are replicated. Similar to the situation above the Thread that 
attempts to receive the next message is stuck at the location shown in the 
attached stack trace.

If this happens the queue from which to read fills up but no messages are 
forwarded to the client anymore. Unfortunately this situation cannot be 
reproduced and happens only sporadically (once or twice a week, our system is 
running 24/7) - it seems that whenever this happens the broker produces an 
AMQ222151 error right before processing stops (see attachment).

Our system has to be able to handle large messages (up to ~40MB) with a binary 
payload encoded using Google protobuf which arrive at an interval of ~10-20sec. 
Previously we tried to receive messages using the JmsMessageConsumer.receive() 
method but we switched to a version where we now provide a timeout of 60sec and 
if no message arrives within that timeout we completely recreate the broker 
connection (and the queue if necessary) to avoid that our client gets stuck.
At first this seemed to help but our client still stops processing from time to 
time and after looking at the implementation of the pull method in the 
org.apache.qpid.jms.JmsConnection I'm wondering why no timeout is provided to 
the request.sync() method even though it is passed in from the outside:

void pull(JmsConsumerId consumerId, long timeout, ProviderSynchronization 
synchronization) throws JMSException {
        checkClosedOrFailed();

        try {
            ProviderFuture request = 
provider.newProviderFuture(synchronization);
            requests.put(request, request);
            try {
                provider.pull(consumerId, timeout, request);
                request.sync();
            } finally {
                requests.remove(request);
            }
        } catch (Exception ioe) {
            throw JmsExceptionSupport.create(ioe);
        }
    }

Since the client basically waits indefinitely (i.e. for hours) for a 
notification in the BalancedProviderFuture I'm assuming there is either a race 
condition or no notification ever happens (I assume the latter is the case but 
I cannot verify this). So as a workaround we are currently testing a version 
where we replaced the try block with the following code:

            try {
              if (timeout < 0) {
                provider.pull(consumerId, timeout, request);
                request.sync();
              } else if (timeout == 0) {
                provider.pull(consumerId, timeout, request);
                request.sync(0, TimeUnit.NANOSECONDS);
              } else {
                long start = System.nanoTime();
                provider.pull(consumerId, timeout, request);
                long diff = TimeUnit.MILLISECONDS.toNanos(timeout) - 
(System.nanoTime() - start);
                request.sync(diff <= 0 ? 0 : diff, TimeUnit.NANOSECONDS);
              }
            }

I would appreciate your feedback on that matter and even though I'm well aware 
that this is just a workaround for a different issue I'm curious why the 
current implementation doesn't honor the requested timeout.
Thanks in advance!
 
[^qpid_client_issue.txt] 


was (Author: perdurabo):
While we were unable to reproduce the problem outlined above we are now 
experiencing a similar issue in our production environment when attempting to 
receive messages from a queue to which all messages from external communication 
partners are replicated. Similar to the situation above the Thread that 
attempts to receive the next message is stuck at the location shown in the 
attached stack trace.

If this happens the queue from which to read fills up but no messages are 
forwarded to the client anymore. Unfortunately this situation cannot be 
reproduced and happens only sporadically (once or twice a week, our system is 
running 24/7) - it seems that whenever this happens the broker produces an 
AMQ222151 error right before processing stops (see attachment).

Our system has to be able to handle large messages (up to ~40MB) with a binary 
payload encoded using Google protobuf which arrive at an interval of ~10-20sec. 
Previously we tried to receive messages using the JmsMessageConsumer.receive() 
method but we switched to a version where we now provide a timeout of 60sec and 
if no message arrives within that timeout we completely recreate the broker 
connection (and the queue if necessary) to avoid that our client gets stuck.
At first this seemed to help but our client still stops processing from time to 
time and after looking at the implementation of the pull method in the 
org.apache.qpid.jms.JmsConnection I'm wondering why no timeout is provided to 
the request.sync() method even though it is passed in from the outside:

{{    void pull(JmsConsumerId consumerId, long timeout, ProviderSynchronization 
synchronization) throws JMSException {
        checkClosedOrFailed();

        try {
            ProviderFuture request = 
provider.newProviderFuture(synchronization);
            requests.put(request, request);
            try {
                provider.pull(consumerId, timeout, request);
                request.sync();
            } finally {
                requests.remove(request);
            }
        } catch (Exception ioe) {
            throw JmsExceptionSupport.create(ioe);
        }
    }}}

Since the client basically waits indefinitely (i.e. for hours) for a 
notification in the BalancedProviderFuture I'm assuming there is either a race 
condition or no notification ever happens (I assume the latter is the case but 
I cannot verify this). So as a workaround we are currently testing a version 
where we replaced the try block with the following code:

            try {
              if (timeout < 0) {
                provider.pull(consumerId, timeout, request);
                request.sync();
              } else if (timeout == 0) {
                provider.pull(consumerId, timeout, request);
                request.sync(0, TimeUnit.NANOSECONDS);
              } else {
                long start = System.nanoTime();
                provider.pull(consumerId, timeout, request);
                long diff = TimeUnit.MILLISECONDS.toNanos(timeout) - 
(System.nanoTime() - start);
                request.sync(diff <= 0 ? 0 : diff, TimeUnit.NANOSECONDS);
              }
            }

I would appreciate your feedback on that matter and even though I'm well aware 
that this is just a workaround for a different issue I'm curious why the 
current implementation doesn't honor the requested timeout.
Thanks in advance!
 
[^qpid_client_issue.txt] 

> Potential race condition in JmsConnection.destroyResource
> ---------------------------------------------------------
>
>                 Key: QPIDJMS-458
>                 URL: https://issues.apache.org/jira/browse/QPIDJMS-458
>             Project: Qpid JMS
>          Issue Type: Bug
>          Components: qpid-jms-client
>    Affects Versions: 0.42.0
>         Environment: OS: Windows 10 64Bit
> Broker: Apache Artemis 2.8.0
> JVM: Java HotSpot(TM) Client VM (25.40-b25, mixed mode)
> Java: version 1.8.0_40, vendor Oracle Corporation
>            Reporter: Christian Danner
>            Priority: Major
>         Attachments: qpid_client_issue.txt
>
>
> It seems there is a race condition when attempting to close a 
> JmsMessageProducer as indicated by the stack trace below. The corresponding 
> Thread is stuck waiting for the JmsMessageProducer to be destroyed for a 
> JmsConnection.
> This behaviour was observed while testing Apache Artemis with low disk space. 
> In the provided trace we attempt to close a broker connection due to a 
> JMSException (TransactionRolledBackException caused by a duplicate message 
> ID), however the Thread gets stuck indefinitely waiting for the 
> JmsMessageProducer to be destroyed.
> We keep track of all sessions for a JmsConnection (one session per Thread) 
> and attempt to perform a graceful connection shutdown by closing all 
> producers and consumers, followed by each session before finally calling 
> close on the connection.
> We use external synchronization to ensure that the connection can only be 
> closed by a single Thread (so in this example all other Threads attempting to 
> use the broker connection are blocked waiting for the lock from the closing 
> Thread to be released).
>  
> Stack Trace:
> {{"Replicator_node1-->node2_[0ms]" #25 prio=5 os_prio=0 tid=0x49383c00 
> nid=0x3918 in Object.wait() [0x4b1ef000]
>    java.lang.Thread.State: WAITING (on object monitor)
>       at java.lang.Object.wait(Native Method)
>       at java.lang.Object.wait(Object.java:502)
>       at 
> org.apache.qpid.jms.provider.BalancedProviderFuture.sync(BalancedProviderFuture.java:137)
>       - locked <0x04e60300> (a 
> org.apache.qpid.jms.provider.BalancedProviderFuture)
>       at 
> org.apache.qpid.jms.JmsConnection.destroyResource(JmsConnection.java:755)
>       at 
> org.apache.qpid.jms.JmsConnection.destroyResource(JmsConnection.java:744)
>       at 
> org.apache.qpid.jms.JmsMessageProducer.doClose(JmsMessageProducer.java:103)
>       at 
> org.apache.qpid.jms.JmsMessageProducer.close(JmsMessageProducer.java:89)
>       at 
> acme.broker.client.jms.impl.JMSMessageProducer.closeInternal(JMSMessageProducer.java:48)
>       at 
> acme.broker.client.jms.impl.JMSMessageProducer.close(JMSMessageProducer.java:43)
>       at acme.broker.client.AbstractSession.tryClose(AbstractSession.java:108)
>       at acme.broker.client.AbstractSession.close(AbstractSession.java:90)
>       at 
> acme.broker.client.AbstractThreadedSessionManager.close(AbstractThreadedSessionManager.java:108)
>       - locked <0x1d321078> (a java.util.concurrent.ConcurrentHashMap)
>       at 
> acme.broker.client.AbstractBrokerConnection.closeInternal(AbstractBrokerConnection.java:204)
>       at 
> acme.broker.client.AbstractBrokerConnection.close(AbstractBrokerConnection.java:84)
>       at 
> acme.replication.jms.JMSMessageBridge.trySend(JMSMessageBridge.java:109)
>       at 
> acme.replication.jms.JMSMessageBridge.access$6(JMSMessageBridge.java:99)
>       at 
> acme.replication.jms.JMSMessageBridge$ReplicatorRunnable.run(JMSMessageBridge.java:62)
>       at java.lang.Thread.run(Thread.java:745)
>    Locked ownable synchronizers:
>       - <0x1cfa76b0> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org
For additional commands, e-mail: dev-h...@qpid.apache.org

Reply via email to