This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit 974609d206ef810d8b3ff27dc099ad2e8065ff2a Author: Clebert Suconic <[email protected]> AuthorDate: Mon Apr 20 13:40:57 2020 -0400 ARTEMIS-2728 Fixing Deadlock with LargeServerMessage verified by LargeMessageTest::testTwoBindingsAndRestart --- .../protocol/amqp/broker/AMQPLargeMessage.java | 22 +++++---- .../impl/journal/LargeServerMessageImpl.java | 52 ++++++++++++++-------- 2 files changed, 46 insertions(+), 28 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java index 5db786d..0bbaffc 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java @@ -141,10 +141,12 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage internalReleaseBuffer(2); } - private synchronized void internalReleaseBuffer(int releases) { - for (int i = 0; i < releases; i++) { - if (temporaryBuffer != null && temporaryBuffer.release()) { - temporaryBuffer = null; + private void internalReleaseBuffer(int releases) { + synchronized (largeBody) { + for (int i = 0; i < releases; i++) { + if (temporaryBuffer != null && temporaryBuffer.release()) { + temporaryBuffer = null; + } } } } @@ -154,12 +156,14 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage return temporaryBuffer; } - public synchronized ByteBuf getSavedEncodeBuffer() { - if (temporaryBuffer == null) { - temporaryBuffer = PooledByteBufAllocator.DEFAULT.buffer(getEstimateSavedEncode()); - saveEncoding(temporaryBuffer); + public ByteBuf getSavedEncodeBuffer() { + synchronized (largeBody) { + if (temporaryBuffer == null) { + temporaryBuffer = PooledByteBufAllocator.DEFAULT.buffer(getEstimateSavedEncode()); + saveEncoding(temporaryBuffer); + } + return temporaryBuffer.retain(1); } - return temporaryBuffer.retain(1); } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java index df5cd67..f931121 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java @@ -169,18 +169,24 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar } @Override - public synchronized void addBytes(final byte[] bytes) throws Exception { - largeBody.addBytes(bytes); + public void addBytes(final byte[] bytes) throws Exception { + synchronized (largeBody) { + largeBody.addBytes(bytes); + } } @Override - public synchronized void addBytes(final ActiveMQBuffer bytes) throws Exception { - largeBody.addBytes(bytes); + public void addBytes(final ActiveMQBuffer bytes) throws Exception { + synchronized (largeBody) { + largeBody.addBytes(bytes); + } } @Override - public synchronized int getEncodeSize() { - return getHeadersAndPropertiesEncodeSize(); + public int getEncodeSize() { + synchronized (largeBody) { + return getHeadersAndPropertiesEncodeSize(); + } } public void encode(final ActiveMQBuffer buffer1) { @@ -223,23 +229,29 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar } @Override - public synchronized void deleteFile() throws Exception { - largeBody.deleteFile(); + public void deleteFile() throws Exception { + synchronized (largeBody) { + largeBody.deleteFile(); + } } @Override - public synchronized int getMemoryEstimate() { - if (memoryEstimate == -1) { - // The body won't be on memory (aways on-file), so we don't consider this for paging - memoryEstimate = getHeadersAndPropertiesEncodeSize() + DataConstants.SIZE_INT + getEncodeSize() + (16 + 4) * 2 + 1; + public int getMemoryEstimate() { + synchronized (largeBody) { + if (memoryEstimate == -1) { + // The body won't be on memory (aways on-file), so we don't consider this for paging + memoryEstimate = getHeadersAndPropertiesEncodeSize() + DataConstants.SIZE_INT + getEncodeSize() + (16 + 4) * 2 + 1; + } + + return memoryEstimate; } - - return memoryEstimate; } @Override - public synchronized void releaseResources(boolean sync) { - largeBody.releaseResources(sync); + public void releaseResources(boolean sync) { + synchronized (largeBody) { + largeBody.releaseResources(sync); + } } @Override @@ -310,12 +322,14 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar } @Override - public synchronized void validateFile() throws ActiveMQException { + public void validateFile() throws ActiveMQException { this.ensureFileExists(true); } - public synchronized void ensureFileExists(boolean toOpen) throws ActiveMQException { - largeBody.ensureFileExists(toOpen); + public void ensureFileExists(boolean toOpen) throws ActiveMQException { + synchronized (largeBody) { + largeBody.ensureFileExists(toOpen); + } }
