[ 
https://issues.apache.org/jira/browse/ARTEMIS-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rico Neubauer updated ARTEMIS-3551:
-----------------------------------
    Description: 
Hi, kindly asking to re-look into this issue.

Artemis 2.19.0

Experienced the same situation and will provide small patch to provoke the 
situation (see below - including the #notifyAll already).

An error on #addPacket results in the #waitCompletion method waiting until 
call–timeout (default=30 sec). Having several threads in this situation then 
leads to all of them waiting until timeout.

Furthermore, it seems that at least while large messages are produced in 
parallel, this also has an impact on other client-consumers, that are not fed 
anymore - however this I could not reproduce reliable, so no hard evidence for 
that.

My proposal is like [~pmolchanov2002] also suggested to add the #notifyAll in 
the exceptional case.

 

[^ARTEMIS-2293-Test.patch]

 

Following is the original description:

 

Block that handles exceptions in the catch(Exception e) doesn't call 
notifyAll(). That cause that other working threads are not released in the 
waitCompletion method.

[https://github.com/apache/activemq-artemis/blob/master/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java]

 

addPacket method:
{code:java}
public void addPacket(byte[] chunk, int flowControlSize, boolean isContinues) {
        int flowControlCredit = 0;
        
        synchronized (this) {
        packetAdded = true;
        if (outStream != null) {
        try {
        if (!isContinues) {
        streamEnded = true;
        }
        
        if (fileCache != null) {
        fileCache.cachePackage(chunk);
        }
        
        outStream.write(chunk);
        
        flowControlCredit = flowControlSize;
        
        notifyAll();
        
        if (streamEnded) {
        outStream.close();
        }
        } catch (Exception e) {
        ActiveMQClientLogger.LOGGER.errorAddingPacket(e);
        handledException = e;
        }
        } else {
        if (fileCache != null) {
        try {
        fileCache.cachePackage(chunk);
        } catch (Exception e) {
        ActiveMQClientLogger.LOGGER.errorAddingPacket(e);
        handledException = e;
        }
        }
        
        largeMessageData.offer(new LargeData(chunk, flowControlSize, 
isContinues));
        }
        }{code}
 

waitCompletion method:
{code:java}
public synchronized boolean waitCompletion(final long timeWait) throws 
ActiveMQException {
        if (outStream == null) {
        // There is no stream.. it will never achieve the end of streaming
        return false;
        }
        
        long timeOut;
        
        // If timeWait = 0, we will use the readTimeout
        // And we will check if no packets have arrived within readTimeout 
milliseconds
        if (timeWait != 0) {
        timeOut = System.currentTimeMillis() + timeWait;
        } else {
        timeOut = System.currentTimeMillis() + readTimeout;
        }
        
        while (!streamEnded && handledException == null) {
        try {
        this.wait(timeWait == 0 ? readTimeout : timeWait);
        } catch (InterruptedException e) {
        throw new ActiveMQInterruptedException(e);
        }
        
        if (!streamEnded && handledException == null) {
        if (timeWait != 0 && System.currentTimeMillis() > timeOut) {
        throw ActiveMQClientMessageBundle.BUNDLE.timeoutOnLargeMessage();
        } else if (System.currentTimeMillis() > timeOut && !packetAdded) {
        throw ActiveMQClientMessageBundle.BUNDLE.timeoutOnLargeMessage();
        }
        }
        }
        
        checkException();
        
        return streamEnded;
        
        }{code}
 

 

  was:
Block that handles exceptions in the catch(Exception e) doesn't call 
notifyAll(). That cause that other working threads are not released in the 
waitCompletion method.

[https://github.com/apache/activemq-artemis/blob/master/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java]

 

addPacket method:
{code:java}
public void addPacket(byte[] chunk, int flowControlSize, boolean isContinues) {
        int flowControlCredit = 0;
        
        synchronized (this) {
        packetAdded = true;
        if (outStream != null) {
        try {
        if (!isContinues) {
        streamEnded = true;
        }
        
        if (fileCache != null) {
        fileCache.cachePackage(chunk);
        }
        
        outStream.write(chunk);
        
        flowControlCredit = flowControlSize;
        
        notifyAll();
        
        if (streamEnded) {
        outStream.close();
        }
        } catch (Exception e) {
        ActiveMQClientLogger.LOGGER.errorAddingPacket(e);
        handledException = e;
        }
        } else {
        if (fileCache != null) {
        try {
        fileCache.cachePackage(chunk);
        } catch (Exception e) {
        ActiveMQClientLogger.LOGGER.errorAddingPacket(e);
        handledException = e;
        }
        }
        
        largeMessageData.offer(new LargeData(chunk, flowControlSize, 
isContinues));
        }
        }{code}
 

waitCompletion method:
{code:java}
public synchronized boolean waitCompletion(final long timeWait) throws 
ActiveMQException {
        if (outStream == null) {
        // There is no stream.. it will never achieve the end of streaming
        return false;
        }
        
        long timeOut;
        
        // If timeWait = 0, we will use the readTimeout
        // And we will check if no packets have arrived within readTimeout 
milliseconds
        if (timeWait != 0) {
        timeOut = System.currentTimeMillis() + timeWait;
        } else {
        timeOut = System.currentTimeMillis() + readTimeout;
        }
        
        while (!streamEnded && handledException == null) {
        try {
        this.wait(timeWait == 0 ? readTimeout : timeWait);
        } catch (InterruptedException e) {
        throw new ActiveMQInterruptedException(e);
        }
        
        if (!streamEnded && handledException == null) {
        if (timeWait != 0 && System.currentTimeMillis() > timeOut) {
        throw ActiveMQClientMessageBundle.BUNDLE.timeoutOnLargeMessage();
        } else if (System.currentTimeMillis() > timeOut && !packetAdded) {
        throw ActiveMQClientMessageBundle.BUNDLE.timeoutOnLargeMessage();
        }
        }
        }
        
        checkException();
        
        return streamEnded;
        
        }{code}
 

 


> CLONE - addPacket method in the 
> org.apache.activemq.artemis.core.client.impl.LargeMessageControllerImpl  
> doesn't notify threads in case of an Exception
> -------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: ARTEMIS-3551
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-3551
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>    Affects Versions: 2.6.4
>            Reporter: Rico Neubauer
>            Priority: Major
>         Attachments: ARTEMIS-2293-Test.patch
>
>
> Hi, kindly asking to re-look into this issue.
> Artemis 2.19.0
> Experienced the same situation and will provide small patch to provoke the 
> situation (see below - including the #notifyAll already).
> An error on #addPacket results in the #waitCompletion method waiting until 
> call–timeout (default=30 sec). Having several threads in this situation then 
> leads to all of them waiting until timeout.
> Furthermore, it seems that at least while large messages are produced in 
> parallel, this also has an impact on other client-consumers, that are not fed 
> anymore - however this I could not reproduce reliable, so no hard evidence 
> for that.
> My proposal is like [~pmolchanov2002] also suggested to add the #notifyAll in 
> the exceptional case.
>  
> [^ARTEMIS-2293-Test.patch]
>  
> Following is the original description:
>  
> Block that handles exceptions in the catch(Exception e) doesn't call 
> notifyAll(). That cause that other working threads are not released in the 
> waitCompletion method.
> [https://github.com/apache/activemq-artemis/blob/master/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java]
>  
> addPacket method:
> {code:java}
> public void addPacket(byte[] chunk, int flowControlSize, boolean isContinues) 
> {
>       int flowControlCredit = 0;
>       
>       synchronized (this) {
>       packetAdded = true;
>       if (outStream != null) {
>       try {
>       if (!isContinues) {
>       streamEnded = true;
>       }
>       
>       if (fileCache != null) {
>       fileCache.cachePackage(chunk);
>       }
>       
>       outStream.write(chunk);
>       
>       flowControlCredit = flowControlSize;
>       
>       notifyAll();
>       
>       if (streamEnded) {
>       outStream.close();
>       }
>       } catch (Exception e) {
>       ActiveMQClientLogger.LOGGER.errorAddingPacket(e);
>       handledException = e;
>       }
>       } else {
>       if (fileCache != null) {
>       try {
>       fileCache.cachePackage(chunk);
>       } catch (Exception e) {
>       ActiveMQClientLogger.LOGGER.errorAddingPacket(e);
>       handledException = e;
>       }
>       }
>       
>       largeMessageData.offer(new LargeData(chunk, flowControlSize, 
> isContinues));
>       }
>       }{code}
>  
> waitCompletion method:
> {code:java}
> public synchronized boolean waitCompletion(final long timeWait) throws 
> ActiveMQException {
>       if (outStream == null) {
>       // There is no stream.. it will never achieve the end of streaming
>       return false;
>       }
>       
>       long timeOut;
>       
>       // If timeWait = 0, we will use the readTimeout
>       // And we will check if no packets have arrived within readTimeout 
> milliseconds
>       if (timeWait != 0) {
>       timeOut = System.currentTimeMillis() + timeWait;
>       } else {
>       timeOut = System.currentTimeMillis() + readTimeout;
>       }
>       
>       while (!streamEnded && handledException == null) {
>       try {
>       this.wait(timeWait == 0 ? readTimeout : timeWait);
>       } catch (InterruptedException e) {
>       throw new ActiveMQInterruptedException(e);
>       }
>       
>       if (!streamEnded && handledException == null) {
>       if (timeWait != 0 && System.currentTimeMillis() > timeOut) {
>       throw ActiveMQClientMessageBundle.BUNDLE.timeoutOnLargeMessage();
>       } else if (System.currentTimeMillis() > timeOut && !packetAdded) {
>       throw ActiveMQClientMessageBundle.BUNDLE.timeoutOnLargeMessage();
>       }
>       }
>       }
>       
>       checkException();
>       
>       return streamEnded;
>       
>       }{code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to