[ https://issues.apache.org/jira/browse/ARTEMIS-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Rico Neubauer updated ARTEMIS-3551: ----------------------------------- Description: Hi, kindly asking to re-look into this issue. Artemis 2.19.0 Experienced the same situation and will provide small patch to provoke the situation (see below - including the #notifyAll already). An error on #addPacket results in the #waitCompletion method waiting until call–timeout (default=30 sec). Having several threads in this situation then leads to all of them waiting until timeout. Furthermore, it seems that at least while large messages are produced in parallel, this also has an impact on other client-consumers, that are not fed anymore - however this I could not reproduce reliable, so no hard evidence for that. My proposal is like [~pmolchanov2002] also suggested to add the #notifyAll in the exceptional case. [^ARTEMIS-2293-Test.patch] Following is the original description: Block that handles exceptions in the catch(Exception e) doesn't call notifyAll(). That cause that other working threads are not released in the waitCompletion method. [https://github.com/apache/activemq-artemis/blob/master/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java] addPacket method: {code:java} public void addPacket(byte[] chunk, int flowControlSize, boolean isContinues) { int flowControlCredit = 0; synchronized (this) { packetAdded = true; if (outStream != null) { try { if (!isContinues) { streamEnded = true; } if (fileCache != null) { fileCache.cachePackage(chunk); } outStream.write(chunk); flowControlCredit = flowControlSize; notifyAll(); if (streamEnded) { outStream.close(); } } catch (Exception e) { ActiveMQClientLogger.LOGGER.errorAddingPacket(e); handledException = e; } } else { if (fileCache != null) { try { fileCache.cachePackage(chunk); } catch (Exception e) { ActiveMQClientLogger.LOGGER.errorAddingPacket(e); handledException = e; } } largeMessageData.offer(new LargeData(chunk, flowControlSize, isContinues)); } }{code} waitCompletion method: {code:java} public synchronized boolean waitCompletion(final long timeWait) throws ActiveMQException { if (outStream == null) { // There is no stream.. it will never achieve the end of streaming return false; } long timeOut; // If timeWait = 0, we will use the readTimeout // And we will check if no packets have arrived within readTimeout milliseconds if (timeWait != 0) { timeOut = System.currentTimeMillis() + timeWait; } else { timeOut = System.currentTimeMillis() + readTimeout; } while (!streamEnded && handledException == null) { try { this.wait(timeWait == 0 ? readTimeout : timeWait); } catch (InterruptedException e) { throw new ActiveMQInterruptedException(e); } if (!streamEnded && handledException == null) { if (timeWait != 0 && System.currentTimeMillis() > timeOut) { throw ActiveMQClientMessageBundle.BUNDLE.timeoutOnLargeMessage(); } else if (System.currentTimeMillis() > timeOut && !packetAdded) { throw ActiveMQClientMessageBundle.BUNDLE.timeoutOnLargeMessage(); } } } checkException(); return streamEnded; }{code} was: Block that handles exceptions in the catch(Exception e) doesn't call notifyAll(). That cause that other working threads are not released in the waitCompletion method. [https://github.com/apache/activemq-artemis/blob/master/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java] addPacket method: {code:java} public void addPacket(byte[] chunk, int flowControlSize, boolean isContinues) { int flowControlCredit = 0; synchronized (this) { packetAdded = true; if (outStream != null) { try { if (!isContinues) { streamEnded = true; } if (fileCache != null) { fileCache.cachePackage(chunk); } outStream.write(chunk); flowControlCredit = flowControlSize; notifyAll(); if (streamEnded) { outStream.close(); } } catch (Exception e) { ActiveMQClientLogger.LOGGER.errorAddingPacket(e); handledException = e; } } else { if (fileCache != null) { try { fileCache.cachePackage(chunk); } catch (Exception e) { ActiveMQClientLogger.LOGGER.errorAddingPacket(e); handledException = e; } } largeMessageData.offer(new LargeData(chunk, flowControlSize, isContinues)); } }{code} waitCompletion method: {code:java} public synchronized boolean waitCompletion(final long timeWait) throws ActiveMQException { if (outStream == null) { // There is no stream.. it will never achieve the end of streaming return false; } long timeOut; // If timeWait = 0, we will use the readTimeout // And we will check if no packets have arrived within readTimeout milliseconds if (timeWait != 0) { timeOut = System.currentTimeMillis() + timeWait; } else { timeOut = System.currentTimeMillis() + readTimeout; } while (!streamEnded && handledException == null) { try { this.wait(timeWait == 0 ? readTimeout : timeWait); } catch (InterruptedException e) { throw new ActiveMQInterruptedException(e); } if (!streamEnded && handledException == null) { if (timeWait != 0 && System.currentTimeMillis() > timeOut) { throw ActiveMQClientMessageBundle.BUNDLE.timeoutOnLargeMessage(); } else if (System.currentTimeMillis() > timeOut && !packetAdded) { throw ActiveMQClientMessageBundle.BUNDLE.timeoutOnLargeMessage(); } } } checkException(); return streamEnded; }{code} > CLONE - addPacket method in the > org.apache.activemq.artemis.core.client.impl.LargeMessageControllerImpl > doesn't notify threads in case of an Exception > ------------------------------------------------------------------------------------------------------------------------------------------------------- > > Key: ARTEMIS-3551 > URL: https://issues.apache.org/jira/browse/ARTEMIS-3551 > Project: ActiveMQ Artemis > Issue Type: Bug > Affects Versions: 2.6.4 > Reporter: Rico Neubauer > Priority: Major > Attachments: ARTEMIS-2293-Test.patch > > > Hi, kindly asking to re-look into this issue. > Artemis 2.19.0 > Experienced the same situation and will provide small patch to provoke the > situation (see below - including the #notifyAll already). > An error on #addPacket results in the #waitCompletion method waiting until > call–timeout (default=30 sec). Having several threads in this situation then > leads to all of them waiting until timeout. > Furthermore, it seems that at least while large messages are produced in > parallel, this also has an impact on other client-consumers, that are not fed > anymore - however this I could not reproduce reliable, so no hard evidence > for that. > My proposal is like [~pmolchanov2002] also suggested to add the #notifyAll in > the exceptional case. > > [^ARTEMIS-2293-Test.patch] > > Following is the original description: > > Block that handles exceptions in the catch(Exception e) doesn't call > notifyAll(). That cause that other working threads are not released in the > waitCompletion method. > [https://github.com/apache/activemq-artemis/blob/master/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java] > > addPacket method: > {code:java} > public void addPacket(byte[] chunk, int flowControlSize, boolean isContinues) > { > int flowControlCredit = 0; > > synchronized (this) { > packetAdded = true; > if (outStream != null) { > try { > if (!isContinues) { > streamEnded = true; > } > > if (fileCache != null) { > fileCache.cachePackage(chunk); > } > > outStream.write(chunk); > > flowControlCredit = flowControlSize; > > notifyAll(); > > if (streamEnded) { > outStream.close(); > } > } catch (Exception e) { > ActiveMQClientLogger.LOGGER.errorAddingPacket(e); > handledException = e; > } > } else { > if (fileCache != null) { > try { > fileCache.cachePackage(chunk); > } catch (Exception e) { > ActiveMQClientLogger.LOGGER.errorAddingPacket(e); > handledException = e; > } > } > > largeMessageData.offer(new LargeData(chunk, flowControlSize, > isContinues)); > } > }{code} > > waitCompletion method: > {code:java} > public synchronized boolean waitCompletion(final long timeWait) throws > ActiveMQException { > if (outStream == null) { > // There is no stream.. it will never achieve the end of streaming > return false; > } > > long timeOut; > > // If timeWait = 0, we will use the readTimeout > // And we will check if no packets have arrived within readTimeout > milliseconds > if (timeWait != 0) { > timeOut = System.currentTimeMillis() + timeWait; > } else { > timeOut = System.currentTimeMillis() + readTimeout; > } > > while (!streamEnded && handledException == null) { > try { > this.wait(timeWait == 0 ? readTimeout : timeWait); > } catch (InterruptedException e) { > throw new ActiveMQInterruptedException(e); > } > > if (!streamEnded && handledException == null) { > if (timeWait != 0 && System.currentTimeMillis() > timeOut) { > throw ActiveMQClientMessageBundle.BUNDLE.timeoutOnLargeMessage(); > } else if (System.currentTimeMillis() > timeOut && !packetAdded) { > throw ActiveMQClientMessageBundle.BUNDLE.timeoutOnLargeMessage(); > } > } > } > > checkException(); > > return streamEnded; > > }{code} > > -- This message was sent by Atlassian Jira (v8.3.4#803005)