Repository: qpid-broker-j Updated Branches: refs/heads/master 331c7276a -> e9e0e74e7
QPID-7832: Fix infinite recursion when handling an AMQP1.0 message encoded in the version 0 format. Fixes regression introduced by 660c206deb352aca3694a6b31f5f7cf6fca70533 End to end test added. Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/e9e0e74e Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/e9e0e74e Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/e9e0e74e Branch: refs/heads/master Commit: e9e0e74e7b23587fbc384aaf03250a3c3e0c7624 Parents: 331c727 Author: Keith Wall <kw...@apache.org> Authored: Thu Oct 19 13:23:57 2017 +0100 Committer: Keith Wall <kw...@apache.org> Committed: Thu Oct 19 13:33:15 2017 +0100 ---------------------------------------------------------------------- .../berkeleydb/AbstractBDBMessageStore.java | 7 +- .../test-store/00000000.jdb | Bin 0 -> 535122 bytes .../berkeleydb/BDBAMQP10V0UpgradeTest.java | 127 +++++++++++++++++++ .../qpid/server/store/StoredMemoryMessage.java | 4 + .../apache/qpid/server/store/StoredMessage.java | 5 + .../qpid/server/protocol/v1_0/Message_1_0.java | 4 +- .../store/jdbc/AbstractJDBCMessageStore.java | 7 +- 7 files changed, 151 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e9e0e74e/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java ---------------------------------------------------------------------- diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index c871508..10906ce 100644 --- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -1023,7 +1023,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore @Override public synchronized QpidByteBuffer getContent(int offset, int length) { - return getContentAsByteBuffer().view(offset, length); + QpidByteBuffer contentAsByteBuffer = getContentAsByteBuffer(); + if (length == Integer.MAX_VALUE) + { + length = contentAsByteBuffer.remaining(); + } + return contentAsByteBuffer.view(offset, length); } @Override http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e9e0e74e/bdbstore/src/test/resources/upgrade/bdbstore-v9-amqp10v0/test-store/00000000.jdb ---------------------------------------------------------------------- diff --git a/bdbstore/src/test/resources/upgrade/bdbstore-v9-amqp10v0/test-store/00000000.jdb b/bdbstore/src/test/resources/upgrade/bdbstore-v9-amqp10v0/test-store/00000000.jdb new file mode 100644 index 0000000..a234247 Binary files /dev/null and b/bdbstore/src/test/resources/upgrade/bdbstore-v9-amqp10v0/test-store/00000000.jdb differ http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e9e0e74e/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBAMQP10V0UpgradeTest.java ---------------------------------------------------------------------- diff --git a/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBAMQP10V0UpgradeTest.java b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBAMQP10V0UpgradeTest.java new file mode 100644 index 0000000..34c550c --- /dev/null +++ b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBAMQP10V0UpgradeTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.File; +import java.io.InputStream; +import java.nio.file.Files; +import java.security.MessageDigest; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.xml.bind.DatatypeConverter; + +import org.apache.qpid.server.model.VirtualHostNode; +import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBVirtualHostNode; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.test.utils.TestBrokerConfiguration; +import org.apache.qpid.util.FileUtils; + +/** + * + * The store was formed with an Qpid JMS Client 0.26 and Qpid Broker v6.1.4 configured to use a BDB virtualhostnode + * and provided store. + * + * byte[] content = new byte[256*1024]; + * IntStream.range(0, content.length).forEachOrdered(i -> content[i] = (byte) (i % 256)); + * BytesMessage message = session.createBytesMessage(); + * message.writeBytes(content); + * message.setStringProperty("sha256hash", DatatypeConverter.printHexBinary(MessageDigest.getInstance("SHA-256").digest(content))); + * messageProducer.send(message); + * + */ +public class BDBAMQP10V0UpgradeTest extends QpidBrokerTestCase +{ + private static final int EXPECTED_MESSAGE_LENGTH = 256 * 1024; + + private String _storeLocation; + + @Override + public void setUp() throws Exception + { + _storeLocation = Files.createTempDirectory("qpid-work-" + getClassQualifiedTestName() + "-bdb-store").toString(); + TestBrokerConfiguration brokerConfiguration = getDefaultBrokerConfiguration(); + brokerConfiguration.setObjectAttribute(VirtualHostNode.class, TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, BDBVirtualHostNode.STORE_PATH, _storeLocation ); + + //Clear the two target directories if they exist. + File directory = new File(_storeLocation); + if (directory.exists() && directory.isDirectory()) + { + FileUtils.delete(directory, true); + } + directory.mkdirs(); + + // copy store files + InputStream src = getClass().getClassLoader().getResourceAsStream("upgrade/bdbstore-v9-amqp10v0/test-store/00000000.jdb"); + FileUtils.copy(src, new File(_storeLocation, "00000000.jdb")); + + super.setUp(); + } + + @Override + public void tearDown() throws Exception + { + try + { + super.tearDown(); + } + finally + { + FileUtils.delete(new File(_storeLocation), true); + } + } + + public void testRecoverAmpqV0Message() throws Exception + { + Connection connection = getConnection(); + connection.start(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = createTestQueue(session, "queue"); + MessageConsumer consumer = session.createConsumer(queue); + + Message message = consumer.receive(getReceiveTimeout()); + assertNotNull("Recovered message not received", message); + assertTrue(message instanceof BytesMessage); + BytesMessage bytesMessage = ((BytesMessage) message); + + long length = bytesMessage.getBodyLength(); + String expectedContentHash = message.getStringProperty("sha256hash"); + byte[] content = new byte[(int) length]; + bytesMessage.readBytes(content); + + assertEquals("Unexpected content length", EXPECTED_MESSAGE_LENGTH, length); + assertNotNull("Message should carry expectedShaHash property", expectedContentHash); + + String contentHash = computeContentHash(content); + assertEquals("Unexpected content hash", expectedContentHash, contentHash); + session.commit(); + } + + private String computeContentHash(final byte[] content) throws Exception + { + MessageDigest digest = MessageDigest.getInstance("SHA-256"); + byte[] hash = digest.digest(content); + return DatatypeConverter.printHexBinary(hash); + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e9e0e74e/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java index f4de0d0..8538a95 100755 --- a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java +++ b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java @@ -83,6 +83,10 @@ public class StoredMemoryMessage<T extends StorableMessageMetaData> implements S try (QpidByteBuffer combined = QpidByteBuffer.concatenate(_content)) { + if (length == Integer.MAX_VALUE) + { + length = combined.remaining(); + } return combined.view(offset, length); } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e9e0e74e/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java index 98b54bd..a1be172 100755 --- a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java +++ b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java @@ -28,6 +28,11 @@ public interface StoredMessage<M extends StorableMessageMetaData> long getMessageNumber(); + /** + * Returns length bytes of message content beginning from the given offset. Caller is responsible + * for the disposal of the returned buffer. If length is {@link Integer#MAX_VALUE}, length is not + * constrained. + */ QpidByteBuffer getContent(int offset, int length); int getContentSize(); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e9e0e74e/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java index e02f24a..f33dfe3 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java @@ -181,7 +181,9 @@ public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageM try { List<EncodingRetainingSection<?>> sections; - try (QpidByteBuffer allSectionsContent = super.getContent()) + // The v0 message format put all sections within the content, so we need to read all the stored content + // not just #getSize() + try (QpidByteBuffer allSectionsContent = super.getContent(0, Integer.MAX_VALUE)) { sections = sectionDecoder.parseAll(allSectionsContent); } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e9e0e74e/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java ---------------------------------------------------------------------- diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java index 9c6cfa9..aeff70e 100644 --- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java +++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java @@ -1435,7 +1435,12 @@ public abstract class AbstractJDBCMessageStore implements MessageStore @Override public synchronized QpidByteBuffer getContent(int offset, int length) { - return getContentAsByteBuffer().view(offset, length); + QpidByteBuffer contentAsByteBuffer = getContentAsByteBuffer(); + if (length == Integer.MAX_VALUE) + { + length = contentAsByteBuffer.remaining(); + } + return contentAsByteBuffer.view(offset, length); } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org