[ https://issues.apache.org/jira/browse/ARTEMIS-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Justin Bertram resolved ARTEMIS-3551. ------------------------------------- Resolution: Duplicate > CLONE - addPacket method in the > org.apache.activemq.artemis.core.client.impl.LargeMessageControllerImpl > doesn't notify threads in case of an Exception > ------------------------------------------------------------------------------------------------------------------------------------------------------- > > Key: ARTEMIS-3551 > URL: https://issues.apache.org/jira/browse/ARTEMIS-3551 > Project: ActiveMQ Artemis > Issue Type: Bug > Affects Versions: 2.6.4 > Reporter: Rico Neubauer > Priority: Major > Attachments: ARTEMIS-2293-Test.patch > > > Hi, kindly asking to re-look into this issue. > Artemis 2.19.0 > Experienced the same situation and will provide small patch to provoke the > situation (see below - including the #notifyAll already). > An error on #addPacket results in the #waitCompletion method waiting until > call–timeout (default=30 sec). Having several threads in this situation then > leads to all of them waiting until timeout. > Furthermore, it seems that at least while large messages are produced in > parallel, this also has an impact on other client-consumers, that are not fed > anymore - however this I could not reproduce reliable, so no hard evidence > for that. > My proposal is like [~pmolchanov2002] also suggested to add the #notifyAll in > the exceptional case. > > [^ARTEMIS-2293-Test.patch] > > Following is the original description: > > Block that handles exceptions in the catch(Exception e) doesn't call > notifyAll(). That cause that other working threads are not released in the > waitCompletion method. > [https://github.com/apache/activemq-artemis/blob/master/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/LargeMessageControllerImpl.java] > > addPacket method: > {code:java} > public void addPacket(byte[] chunk, int flowControlSize, boolean isContinues) > { > int flowControlCredit = 0; > > synchronized (this) { > packetAdded = true; > if (outStream != null) { > try { > if (!isContinues) { > streamEnded = true; > } > > if (fileCache != null) { > fileCache.cachePackage(chunk); > } > > outStream.write(chunk); > > flowControlCredit = flowControlSize; > > notifyAll(); > > if (streamEnded) { > outStream.close(); > } > } catch (Exception e) { > ActiveMQClientLogger.LOGGER.errorAddingPacket(e); > handledException = e; > } > } else { > if (fileCache != null) { > try { > fileCache.cachePackage(chunk); > } catch (Exception e) { > ActiveMQClientLogger.LOGGER.errorAddingPacket(e); > handledException = e; > } > } > > largeMessageData.offer(new LargeData(chunk, flowControlSize, > isContinues)); > } > }{code} > > waitCompletion method: > {code:java} > public synchronized boolean waitCompletion(final long timeWait) throws > ActiveMQException { > if (outStream == null) { > // There is no stream.. it will never achieve the end of streaming > return false; > } > > long timeOut; > > // If timeWait = 0, we will use the readTimeout > // And we will check if no packets have arrived within readTimeout > milliseconds > if (timeWait != 0) { > timeOut = System.currentTimeMillis() + timeWait; > } else { > timeOut = System.currentTimeMillis() + readTimeout; > } > > while (!streamEnded && handledException == null) { > try { > this.wait(timeWait == 0 ? readTimeout : timeWait); > } catch (InterruptedException e) { > throw new ActiveMQInterruptedException(e); > } > > if (!streamEnded && handledException == null) { > if (timeWait != 0 && System.currentTimeMillis() > timeOut) { > throw ActiveMQClientMessageBundle.BUNDLE.timeoutOnLargeMessage(); > } else if (System.currentTimeMillis() > timeOut && !packetAdded) { > throw ActiveMQClientMessageBundle.BUNDLE.timeoutOnLargeMessage(); > } > } > } > > checkException(); > > return streamEnded; > > }{code} > > -- This message was sent by Atlassian Jira (v8.3.4#803005)