QPID-7832: [Java Broker] Refactor store/protocol API using Collection<QpidByteBuffer>
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/660c206d Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/660c206d Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/660c206d Branch: refs/heads/master Commit: 660c206deb352aca3694a6b31f5f7cf6fca70533 Parents: 955a79b Author: Lorenz Quack <lqu...@apache.org> Authored: Fri Oct 13 16:10:50 2017 +0100 Committer: Lorenz Quack <lqu...@apache.org> Committed: Wed Oct 18 16:21:21 2017 +0100 ---------------------------------------------------------------------- .../berkeleydb/AbstractBDBMessageStore.java | 199 +-- .../berkeleydb/tuple/ByteBufferBinding.java | 15 +- .../tuple/MessageMetaDataBinding.java | 25 +- .../store/berkeleydb/BDBMessageStoreTest.java | 247 +-- .../bytebuffer/NonPooledByteBufferRef.java | 4 + .../server/bytebuffer/PooledByteBufferRef.java | 6 + .../qpid/server/bytebuffer/QpidByteBuffer.java | 1656 +++++++++++++----- .../bytebuffer/QpidByteBufferInputStream.java | 109 -- .../bytebuffer/QpidByteBufferOutputStream.java | 62 +- .../server/bytebuffer/QpidByteBufferUtils.java | 285 --- .../configuration/updater/TaskExecutorImpl.java | 33 +- .../message/AbstractServerMessageImpl.java | 8 +- .../server/message/MessageContentSource.java | 5 +- .../message/internal/InternalMessage.java | 21 +- .../internal/InternalMessageMetaDataType.java | 6 +- .../qpid/server/plugin/MessageFormat.java | 6 +- .../qpid/server/plugin/MessageMetaDataType.java | 6 +- ...idByteBufferDisposingThreadPoolExecutor.java | 81 - .../server/protocol/v0_8/AMQShortString.java | 2 +- .../qpid/server/protocol/v0_8/FieldArray.java | 11 +- .../apache/qpid/server/queue/AbstractQueue.java | 53 +- .../qpid/server/store/MemoryMessageStore.java | 9 +- .../qpid/server/store/StoredMemoryMessage.java | 77 +- .../apache/qpid/server/store/StoredMessage.java | 4 +- .../store/serializer/v1/Deserializer.java | 27 +- .../store/serializer/v1/MessageRecord.java | 19 +- .../v1/MessageStoreSerializer_v1.java | 35 +- .../transport/MultiVersionProtocolEngine.java | 41 +- .../transport/NetworkConnectionScheduler.java | 17 +- .../server/transport/NonBlockingConnection.java | 52 +- .../NonBlockingConnectionDelegate.java | 3 +- .../NonBlockingConnectionPlainDelegate.java | 50 +- .../NonBlockingConnectionTLSDelegate.java | 61 +- .../NonBlockingConnectionUndecidedDelegate.java | 38 +- .../transport/network/security/ssl/SSLUtil.java | 11 +- .../qpid/server/transport/util/Functions.java | 13 - .../qpid/server/util/ByteBufferUtils.java | 71 - .../qpid/server/util/HousekeepingExecutor.java | 26 +- .../AsynchronousMessageStoreRecoverer.java | 13 +- .../QpidByteBufferOutputStreamTest.java | 119 +- .../server/bytebuffer/QpidByteBufferTest.java | 491 +++--- .../server/protocol/v0_8/FieldTableTest.java | 10 +- .../security/TrustStoreMessageSourceTest.java | 22 +- .../qpid/server/store/MessageStoreTestCase.java | 12 - .../store/TestMessageMetaDataFactory.java | 9 +- .../server/store/TestMessageMetaDataType.java | 15 +- .../qpid/server/txn/MockServerMessage.java | 10 +- .../VirtualHostPropertiesNodeTest.java | 4 + .../protocol/v0_10/ConsumerTarget_0_10.java | 63 +- .../MessageConverter_Internal_to_v0_10.java | 6 +- .../protocol/v0_10/MessageConverter_v0_10.java | 4 +- .../MessageConverter_v0_10_to_Internal.java | 8 +- .../protocol/v0_10/MessageFormat_0_10.java | 42 +- .../v0_10/MessageMetaDataType_0_10.java | 6 +- .../protocol/v0_10/MessageMetaData_0_10.java | 2 +- .../protocol/v0_10/MessageTransferMessage.java | 8 +- .../server/protocol/v0_10/ServerAssembler.java | 208 ++- .../server/protocol/v0_10/ServerConnection.java | 11 +- .../server/protocol/v0_10/ServerDecoder.java | 117 +- .../protocol/v0_10/ServerDisassembler.java | 109 +- .../protocol/v0_10/ServerSessionDelegate.java | 7 +- .../v0_10/transport/MessageTransfer.java | 79 +- .../server/protocol/v0_10/transport/Method.java | 10 +- .../MessageConverter_0_10_to_InternalTest.java | 13 +- .../MessageConverter_Internal_to_0_10Test.java | 30 +- .../PropertyConverter_0_10_to_InternalTest.java | 8 +- ...PropertyConverter_Internal_to_v0_10Test.java | 4 +- .../protocol/v0_10/ServerSessionTest.java | 11 +- .../qpid/server/protocol/v0_8/AMQChannel.java | 22 + .../protocol/v0_8/AMQPConnection_0_8Impl.java | 19 +- .../qpid/server/protocol/v0_8/CachedFrame.java | 14 +- .../v0_8/MessageConverter_Internal_to_v0_8.java | 6 +- .../v0_8/MessageConverter_v0_8_to_Internal.java | 8 +- .../protocol/v0_8/MessageFormat_0_9_1.java | 153 +- .../server/protocol/v0_8/MessageMetaData.java | 39 +- .../protocol/v0_8/MessageMetaDataType_0_8.java | 6 +- .../v0_8/ProtocolOutputConverterImpl.java | 106 +- .../protocol/v0_8/transport/AMQFrame.java | 23 +- .../v0_8/transport/AMQMethodBodyImpl.java | 15 +- .../transport/BasicContentHeaderProperties.java | 33 +- .../protocol/v0_8/transport/ContentBody.java | 19 +- .../v0_8/transport/ContentHeaderBody.java | 22 +- .../server/protocol/v0_8/AMQDecoderTest.java | 65 +- .../MessageConverter_0_8_to_InternalTest.java | 13 +- .../MessageConverter_Internal_to_0_8Test.java | 31 +- .../v0_8/MessageMetaDataFactoryTest.java | 54 +- .../PropertyConverter_v0_8_to_InternalTest.java | 9 +- .../protocol/v1_0/AMQPConnection_1_0.java | 3 +- .../protocol/v1_0/AMQPConnection_1_0Impl.java | 115 +- .../v1_0/AbstractReceivingLinkEndpoint.java | 12 + .../protocol/v1_0/ConsumerTarget_1_0.java | 26 +- .../qpid/server/protocol/v1_0/Delivery.java | 21 +- .../v1_0/MessageConverter_from_1_0.java | 49 +- .../protocol/v1_0/MessageConverter_to_1_0.java | 42 +- .../server/protocol/v1_0/MessageFormat_1_0.java | 11 +- .../protocol/v1_0/MessageMetaDataType_1_0.java | 6 +- .../protocol/v1_0/MessageMetaData_1_0.java | 25 +- .../qpid/server/protocol/v1_0/Message_1_0.java | 60 +- .../qpid/server/protocol/v1_0/Session_1_0.java | 18 +- .../v1_0/StandardReceivingLinkEndpoint.java | 13 +- .../TxnCoordinatorReceivingLinkEndpoint.java | 106 +- .../codec/AbstractCompositeTypeConstructor.java | 22 +- .../codec/AbstractDescribedTypeConstructor.java | 8 +- .../v1_0/codec/ArrayTypeConstructor.java | 27 +- .../v1_0/codec/BinaryTypeConstructor.java | 15 +- .../protocol/v1_0/codec/BooleanConstructor.java | 15 +- .../v1_0/codec/ByteTypeConstructor.java | 9 +- .../v1_0/codec/CharTypeConstructor.java | 13 +- .../protocol/v1_0/codec/DecimalConstructor.java | 22 +- .../v1_0/codec/DescribedTypeConstructor.java | 6 +- .../v1_0/codec/DoubleTypeConstructor.java | 9 +- .../v1_0/codec/FloatTypeConstructor.java | 9 +- .../server/protocol/v1_0/codec/FrameWriter.java | 38 +- .../protocol/v1_0/codec/IntTypeConstructor.java | 9 +- .../protocol/v1_0/codec/ListConstructor.java | 15 +- .../v1_0/codec/LongTypeConstructor.java | 9 +- .../protocol/v1_0/codec/MapConstructor.java | 18 +- .../v1_0/codec/NullTypeConstructor.java | 4 +- .../v1_0/codec/ShortTypeConstructor.java | 9 +- .../v1_0/codec/SmallIntConstructor.java | 9 +- .../v1_0/codec/SmallLongConstructor.java | 9 +- .../v1_0/codec/SmallUIntConstructor.java | 9 +- .../v1_0/codec/SmallULongConstructor.java | 9 +- .../v1_0/codec/StringTypeConstructor.java | 61 +- .../v1_0/codec/SymbolTypeConstructor.java | 67 +- .../v1_0/codec/TimestampTypeConstructor.java | 8 +- .../protocol/v1_0/codec/TypeConstructor.java | 4 +- .../v1_0/codec/UByteTypeConstructor.java | 9 +- .../v1_0/codec/UIntTypeConstructor.java | 9 +- .../v1_0/codec/ULongTypeConstructor.java | 9 +- .../v1_0/codec/UShortTypeConstructor.java | 9 +- .../v1_0/codec/UUIDTypeConstructor.java | 10 +- .../protocol/v1_0/codec/ValueHandler.java | 42 +- .../v1_0/codec/ZeroListConstructor.java | 2 +- .../v1_0/codec/ZeroUIntConstructor.java | 6 +- .../v1_0/codec/ZeroULongConstructor.java | 6 +- .../server/protocol/v1_0/framing/AMQFrame.java | 12 +- .../protocol/v1_0/framing/FrameHandler.java | 21 +- .../protocol/v1_0/framing/TransportFrame.java | 4 +- .../protocol/v1_0/messaging/SectionDecoder.java | 2 +- .../v1_0/messaging/SectionDecoderImpl.java | 36 +- .../protocol/v1_0/store/LinkStoreUtils.java | 22 +- .../v1_0/type/messaging/AbstractSection.java | 101 +- .../type/messaging/AmqpSequenceSection.java | 2 +- .../v1_0/type/messaging/AmqpValueSection.java | 4 +- .../messaging/ApplicationPropertiesSection.java | 3 +- .../v1_0/type/messaging/DataSection.java | 4 +- .../messaging/DeliveryAnnotationsSection.java | 3 +- .../v1_0/type/messaging/FooterSection.java | 3 +- .../v1_0/type/messaging/HeaderSection.java | 4 +- .../messaging/MessageAnnotationsSection.java | 3 +- .../v1_0/type/messaging/PropertiesSection.java | 4 +- .../codec/AbstractLazyConstructor.java | 44 +- .../codec/AmqpSequenceSectionConstructor.java | 4 +- .../codec/AmqpValueSectionConstructor.java | 33 +- ...ApplicationPropertiesSectionConstructor.java | 4 +- .../messaging/codec/DataSectionConstructor.java | 31 +- .../DeliveryAnnotationsSectionConstructor.java | 4 +- .../codec/DescribedListSectionConstructor.java | 35 +- .../codec/DescribedMapSectionConstructor.java | 35 +- .../type/messaging/codec/EncodingRetaining.java | 4 +- .../codec/FooterSectionConstructor.java | 4 +- .../codec/HeaderSectionConstructor.java | 4 +- .../MessageAnnotationsSectionConstructor.java | 4 +- .../codec/PropertiesSectionConstructor.java | 4 +- .../protocol/v1_0/type/transport/Transfer.java | 31 +- .../protocol/v1_0/ConsumerTarget_1_0Test.java | 36 +- .../MessageConverter_Internal_to_1_0Test.java | 20 +- .../MessageConverter_v1_0_to_InternalTest.java | 33 +- .../PropertyConverter_v1_0_to_InternalTest.java | 13 +- .../MessageConverter_1_0_to_v0_10.java | 6 +- .../MessageConverter_0_10_to_1_0Test.java | 21 +- .../MessageConverter_1_0_to_v0_10Test.java | 88 +- .../PropertyConverter_0_10_to_1_0Test.java | 8 +- .../PropertyConverter_1_0_to_0_10Test.java | 16 +- .../MessageConverter_0_10_to_0_8.java | 23 +- .../MessageConverter_0_8_to_0_10.java | 16 +- .../PropertyConverter_0_10_to_0_8Test.java | 8 +- .../PropertyConverter_0_8_to_0_10Test.java | 9 +- .../v0_8_v1_0/MessageConverter_1_0_to_v0_8.java | 6 +- .../MessageConverter_0_8_to_1_0Test.java | 21 +- .../MessageConverter_1_0_to_v0_8Test.java | 87 +- .../PropertyConverter_0_8_to_1_0Test.java | 8 +- .../PropertyConverter_1_0_to_0_8Test.java | 5 +- .../store/derby/AbstractDerbyMessageStore.java | 5 +- .../qpid/server/store/derby/DerbyUtils.java | 5 +- .../store/jdbc/AbstractJDBCMessageStore.java | 204 +-- .../jdbc/GenericJDBCConfigurationStore.java | 13 +- .../store/jdbc/GenericJDBCMessageStore.java | 9 +- .../management/plugin/HttpManagement.java | 59 +- .../management/plugin/report/ReportRunner.java | 11 +- .../plugin/report/ReportRunnerTest.java | 3 + .../transport/websocket/WebSocketProvider.java | 119 +- .../tests/protocol/v1_0/FrameTransport.java | 33 +- .../qpid/tests/protocol/v1_0/Interaction.java | 19 +- .../tests/protocol/v1_0/MessageDecoder.java | 11 +- .../tests/protocol/v1_0/MessageEncoder.java | 10 +- .../qpid/tests/protocol/v1_0/OutputHandler.java | 4 +- .../apache/qpid/tests/protocol/v1_0/Utils.java | 44 +- .../tests/protocol/v1_0/DecodeErrorTest.java | 39 +- .../protocol/v1_0/messaging/MessageFormat.java | 9 +- .../v1_0/messaging/MultiTransferTest.java | 60 +- .../protocol/v1_0/messaging/OutcomeTest.java | 1 - .../protocol/v1_0/messaging/TransferTest.java | 8 +- .../transport/link/ResumeDeliveriesTest.java | 13 +- .../qpid/transport/ProtocolNegotiationTest.java | 1 + 206 files changed, 3492 insertions(+), 4625 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java ---------------------------------------------------------------------- diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index f84b02c..c871508 100644 --- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -21,11 +21,9 @@ package org.apache.qpid.server.store.berkeleydb; import static org.apache.qpid.server.store.berkeleydb.BDBUtils.DEFAULT_DATABASE_CONFIG; import static org.apache.qpid.server.store.berkeleydb.BDBUtils.abortTransactionSafely; -import java.nio.ByteBuffer; -import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Random; @@ -62,7 +60,6 @@ import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction; import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey; -import org.apache.qpid.server.store.berkeleydb.tuple.ByteBufferBinding; import org.apache.qpid.server.store.berkeleydb.tuple.MessageMetaDataBinding; import org.apache.qpid.server.store.berkeleydb.tuple.PreparedTransactionBinding; import org.apache.qpid.server.store.berkeleydb.tuple.QueueEntryBinding; @@ -88,12 +85,11 @@ public abstract class AbstractBDBMessageStore implements MessageStore private static final String BRIDGEDB_NAME = "BRIDGES"; private static final String LINKDB_NAME = "LINKS"; private static final String XID_DB_NAME = "XIDS"; - private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocateDirect(0); private final EventManager _eventManager = new EventManager(); private final DatabaseEntry MESSAGE_METADATA_SEQ_KEY = new DatabaseEntry("MESSAGE_METADATA_SEQ_KEY".getBytes( - Charset.forName("UTF-8"))); + StandardCharsets.UTF_8)); private final SequenceConfig MESSAGE_METADATA_SEQ_CONFIG = SequenceConfig.DEFAULT. setAllowCreate(true). @@ -351,66 +347,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } - - /** - * Fills the provided ByteBuffer with as much content for the specified message as possible, starting - * from the specified offset in the message. - * - * @param messageId The message to get the data for. - * @param offset The offset of the data within the message. - * @param dst The destination of the content read back - * - * @return The number of bytes inserted into the destination - * - * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason, or if the specified message does not exist. - */ - int getContent(long messageId, int offset, ByteBuffer dst) throws StoreException - { - DatabaseEntry contentKeyEntry = new DatabaseEntry(); - LongBinding.longToEntry(messageId, contentKeyEntry); - DatabaseEntry value = new DatabaseEntry(); - ByteBufferBinding contentTupleBinding = ByteBufferBinding.getInstance(); - - - getLogger().debug("Message Id: {} Getting content body from offset: {}", messageId, offset); - - - try - { - - int written = 0; - OperationStatus status = getMessageContentDb().get(null, contentKeyEntry, value, LockMode.READ_UNCOMMITTED); - if (status == OperationStatus.SUCCESS) - { - QpidByteBuffer buffer = contentTupleBinding.entryToObject(value); - int size = buffer.remaining(); - if (offset > size) - { - throw new RuntimeException("Offset " + offset + " is greater than message size " + size - + " for message id " + messageId + "!"); - - } - - written = size - offset; - if(written > dst.remaining()) - { - written = dst.remaining(); - } - buffer = buffer.view(offset, written); - buffer.get(dst); - } - return written; - } - catch (RuntimeException e) - { - throw getEnvironmentFacade().handleDatabaseException("Error getting AMQMessage with id " - + messageId - + " to database: " - + e.getMessage(), e); - } - } - - Collection<QpidByteBuffer> getAllContent(long messageId) throws StoreException + QpidByteBuffer getAllContent(long messageId) throws StoreException { DatabaseEntry contentKeyEntry = new DatabaseEntry(); LongBinding.longToEntry(messageId, contentKeyEntry); @@ -427,15 +364,10 @@ public abstract class AbstractBDBMessageStore implements MessageStore byte[] data = value.getData(); int offset = value.getOffset(); int length = value.getSize(); - Collection<QpidByteBuffer> buffers = QpidByteBuffer.allocateDirectCollection(length); - for(QpidByteBuffer buf : buffers) - { - int bufSize = buf.remaining(); - buf.put(data, offset, bufSize); - buf.flip(); - offset+=bufSize; - } - return buffers; + QpidByteBuffer buf = QpidByteBuffer.allocateDirect(length); + buf.put(data, offset, length); + buf.flip(); + return buf; } else { @@ -534,25 +466,14 @@ public abstract class AbstractBDBMessageStore implements MessageStore * * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason, or if the specified message does not exist. */ - private void addContent(final Transaction tx, long messageId, - Collection<QpidByteBuffer> contentBody) throws StoreException + private void addContent(final Transaction tx, long messageId, QpidByteBuffer contentBody) throws StoreException { DatabaseEntry key = new DatabaseEntry(); LongBinding.longToEntry(messageId, key); DatabaseEntry value = new DatabaseEntry(); - int size = 0; - - for(QpidByteBuffer buf : contentBody) - { - size += buf.remaining(); - } - byte[] data = new byte[size]; - ByteBuffer dst = ByteBuffer.wrap(data); - for(QpidByteBuffer buf : contentBody) - { - buf.copyTo(dst); - } + byte[] data = new byte[contentBody.remaining()]; + contentBody.copyTo(data); value.setData(data); try { @@ -937,7 +858,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore private static class MessageDataRef<T extends StorableMessageMetaData> { private volatile T _metaData; - private volatile Collection<QpidByteBuffer> _data; + private volatile QpidByteBuffer _data; private volatile boolean _isHardRef; private MessageDataRef(final T metaData, boolean isHardRef) @@ -945,7 +866,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore this(metaData, null, isHardRef); } - private MessageDataRef(final T metaData, Collection<QpidByteBuffer> data, boolean isHardRef) + private MessageDataRef(final T metaData, QpidByteBuffer data, boolean isHardRef) { _metaData = metaData; _data = data; @@ -957,12 +878,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore return _metaData; } - public Collection<QpidByteBuffer> getData() + public QpidByteBuffer getData() { return _data; } - public void setData(final Collection<QpidByteBuffer> data) + public void setData(final QpidByteBuffer data) { _data = data; } @@ -997,11 +918,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore } if(_data != null) { - for(QpidByteBuffer buf : _data) - { - bytesCleared += buf.remaining(); - buf.dispose(); - } + bytesCleared += _data.remaining(); + _data.dispose(); _data = null; } return bytesCleared; @@ -1058,20 +976,17 @@ public abstract class AbstractBDBMessageStore implements MessageStore @Override public synchronized void addContent(QpidByteBuffer src) { - src = src.slice(); - Collection<QpidByteBuffer> data = _messageDataRef.getData(); - if(data == null) + try(QpidByteBuffer data = _messageDataRef.getData()) { - _messageDataRef.setData(Collections.singleton(src)); - } - else - { - List<QpidByteBuffer> newCollection = new ArrayList<>(data.size()+1); - newCollection.addAll(data); - newCollection.add(src); - _messageDataRef.setData(Collections.unmodifiableCollection(newCollection)); + if(data == null) + { + _messageDataRef.setData(src.slice()); + } + else + { + _messageDataRef.setData(QpidByteBuffer.concatenate(Arrays.asList(data, src))); + } } - } @Override @@ -1082,11 +997,11 @@ public abstract class AbstractBDBMessageStore implements MessageStore } /** - * returns QBBs containing the content. The caller must not dispose of them because we keep a reference in _messageDataRef. + * returns QBB containing the content. The caller must not dispose of them because we keep a reference in _messageDataRef. */ - private Collection<QpidByteBuffer> getContentAsByteBuffer() + private QpidByteBuffer getContentAsByteBuffer() { - Collection<QpidByteBuffer> data = _messageDataRef == null ? Collections.<QpidByteBuffer>emptyList() : _messageDataRef.getData(); + QpidByteBuffer data = _messageDataRef == null ? QpidByteBuffer.emptyQpidByteBuffer() : _messageDataRef.getData(); if(data == null) { if(stored()) @@ -1098,7 +1013,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore } else { - data = Collections.emptyList(); + data = QpidByteBuffer.emptyQpidByteBuffer(); } } return data; @@ -1106,49 +1021,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore @Override - public synchronized Collection<QpidByteBuffer> getContent(int offset, int length) + public synchronized QpidByteBuffer getContent(int offset, int length) { - Collection<QpidByteBuffer> bufs = getContentAsByteBuffer(); - Collection<QpidByteBuffer> content = new ArrayList<>(bufs.size()); - int pos = 0; - for (QpidByteBuffer buf : bufs) - { - if(length > 0) - { - int bufRemaining = buf.remaining(); - if (pos + bufRemaining <= offset) - { - pos += bufRemaining; - } - else if (pos >= offset) - { - buf = buf.duplicate(); - if (bufRemaining <= length) - { - length -= bufRemaining; - } - else - { - buf.limit(length); - length = 0; - } - content.add(buf); - pos += buf.remaining(); - - } - else - { - int offsetInBuf = offset - pos; - int limit = length < bufRemaining - offsetInBuf ? length : bufRemaining - offsetInBuf; - final QpidByteBuffer bufView = buf.view(offsetInBuf, limit); - content.add(bufView); - length -= limit; - pos+=limit+offsetInBuf; - } - } - - } - return content; + return getContentAsByteBuffer().view(offset, length); } @Override @@ -1170,7 +1045,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore AbstractBDBMessageStore.this.storeMetaData(txn, _messageId, _messageDataRef.getMetaData()); AbstractBDBMessageStore.this.addContent(txn, _messageId, _messageDataRef.getData() == null - ? Collections.<QpidByteBuffer>emptySet() + ? QpidByteBuffer.emptyQpidByteBuffer() : _messageDataRef.getData()); _messageDataRef.setSoft(); } @@ -1219,14 +1094,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore metaData.dispose(); } - Collection<QpidByteBuffer> data = _messageDataRef.getData(); - if(data != null) + try (QpidByteBuffer data = _messageDataRef.getData()) { - bytesCleared += getContentSize(); - _messageDataRef.setData(null); - for(QpidByteBuffer buf : data) + if (data != null) { - buf.dispose(); + bytesCleared += getContentSize(); + _messageDataRef.setData(null); } } _messageDataRef = null; http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ByteBufferBinding.java ---------------------------------------------------------------------- diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ByteBufferBinding.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ByteBufferBinding.java index d6d09dc..3b10bfc 100644 --- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ByteBufferBinding.java +++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ByteBufferBinding.java @@ -71,15 +71,16 @@ public class ByteBufferBinding extends TupleBinding<QpidByteBuffer> @Override public void objectToEntry(QpidByteBuffer data, final TupleOutput output) { - QpidByteBuffer dup = data.duplicate(); - byte[] copyBuf = COPY_BUFFER.get(); - while(dup.hasRemaining()) + try (QpidByteBuffer dup = data.duplicate()) { - int length = Math.min(COPY_BUFFER_SIZE, dup.remaining()); - dup.get(copyBuf,0,length); - output.write(copyBuf,0,length); + byte[] copyBuf = COPY_BUFFER.get(); + while (dup.hasRemaining()) + { + int length = Math.min(COPY_BUFFER_SIZE, dup.remaining()); + dup.get(copyBuf, 0, length); + output.write(copyBuf, 0, length); + } } - dup.dispose(); } public ByteBuffer readByteBuffer(final TupleInput input, int length) http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java ---------------------------------------------------------------------- diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java index 0fd3e44..0b9fa17 100644 --- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java +++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java @@ -23,7 +23,6 @@ package org.apache.qpid.server.store.berkeleydb.tuple; import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOException; -import java.util.List; import com.sleepycat.bind.EntryBinding; import com.sleepycat.je.DatabaseEntry; @@ -61,15 +60,10 @@ public class MessageMetaDataBinding implements EntryBinding<StorableMessageMetaD final int metaDataType = stream.readByte() & 0xff; MessageMetaDataType type = MessageMetaDataTypeRegistry.fromOrdinal(metaDataType); - List<QpidByteBuffer> bufs = QpidByteBuffer.asQpidByteBuffers(stream); - - final StorableMessageMetaData metaData = type.createMetaData(bufs); - - for (final QpidByteBuffer buf : bufs) + try (QpidByteBuffer buf = QpidByteBuffer.asQpidByteBuffer(stream)) { - buf.dispose(); + return type.createMetaData(buf); } - return metaData; } catch (IOException | RuntimeException e) { @@ -83,12 +77,15 @@ public class MessageMetaDataBinding implements EntryBinding<StorableMessageMetaD final int bodySize = 1 + metaData.getStorableSize(); byte[] underlying = new byte[4+bodySize]; underlying[4] = (byte) metaData.getType().ordinal(); - QpidByteBuffer buf = QpidByteBuffer.wrap(underlying); - buf.putInt(bodySize ^ 0x80000000); - buf.position(5); - buf = buf.slice(); - - metaData.writeToBuffer(buf); + try (QpidByteBuffer buf = QpidByteBuffer.wrap(underlying)) + { + buf.putInt(bodySize ^ 0x80000000); + buf.position(5); + try (QpidByteBuffer bufSlice = buf.slice()) + { + metaData.writeToBuffer(bufSlice); + } + } entry.setData(underlying); } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java ---------------------------------------------------------------------- diff --git a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java index da45856..91f698a 100644 --- a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java +++ b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java @@ -24,36 +24,21 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.File; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.Collections; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; +import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.protocol.v0_8.AMQShortString; +import org.apache.qpid.server.protocol.v0_8.MessageMetaData; import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties; import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody; import org.apache.qpid.server.protocol.v0_8.transport.MessagePublishInfo; -import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.protocol.v0_10.MessageMetaDataType_0_10; -import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10; -import org.apache.qpid.server.protocol.v0_8.MessageMetaData; -import org.apache.qpid.server.protocol.v0_8.MessageMetaDataType_0_8; import org.apache.qpid.server.store.MessageHandle; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreTestCase; -import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.virtualhost.berkeleydb.BDBVirtualHost; -import org.apache.qpid.server.protocol.v0_10.transport.DeliveryProperties; -import org.apache.qpid.server.protocol.v0_10.transport.Header; -import org.apache.qpid.server.protocol.v0_10.transport.MessageAcceptMode; -import org.apache.qpid.server.protocol.v0_10.transport.MessageAcquireMode; -import org.apache.qpid.server.protocol.v0_10.transport.MessageDeliveryMode; -import org.apache.qpid.server.protocol.v0_10.transport.MessageDeliveryPriority; -import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties; -import org.apache.qpid.server.protocol.v0_10.transport.MessageTransfer; import org.apache.qpid.server.util.FileUtils; +import org.apache.qpid.server.virtualhost.berkeleydb.BDBVirtualHost; /** * Subclass of MessageStoreTestCase which runs the standard tests from the superclass against @@ -78,164 +63,6 @@ public class BDBMessageStoreTest extends MessageStoreTestCase } } - /** - * Tests that message metadata and content are successfully read back from a - * store after it has been reloaded. Both 0-8 and 0-10 metadata is used to - * verify their ability to co-exist within the store and be successful retrieved. - */ - public void testBDBMessagePersistence() throws Exception - { - MessageStore bdbStore = getStore(); - - // Create content ByteBuffers. - // Split the content into 2 chunks for the 0-8 message, as per broker behaviour. - // Use a single chunk for the 0-10 message as per broker behaviour. - String bodyText = "jfhdjsflsdhfjdshfjdslhfjdslhfsjlhfsjkhfdsjkhfdsjkfhdslkjf"; - - QpidByteBuffer firstContentBytes_0_8 = QpidByteBuffer.wrap(bodyText.substring(0, 10).getBytes()); - QpidByteBuffer secondContentBytes_0_8 = QpidByteBuffer.wrap(bodyText.substring(10).getBytes()); - - QpidByteBuffer completeContentBody_0_10 = QpidByteBuffer.wrap(bodyText.getBytes()); - int bodySize = completeContentBody_0_10.limit(); - - /* - * Create and insert a 0-8 message (metadata and multi-chunk content) - */ - MessagePublishInfo pubInfoBody_0_8 = createPublishInfoBody_0_8(); - BasicContentHeaderProperties props_0_8 = createContentHeaderProperties_0_8(); - - ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize); - - MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8); - MessageHandle<MessageMetaData> messageHandle_0_8 = bdbStore.addMessage(messageMetaData_0_8); - - long origArrivalTime_0_8 = messageMetaData_0_8.getArrivalTime(); - - messageHandle_0_8.addContent(firstContentBytes_0_8); - messageHandle_0_8.addContent(secondContentBytes_0_8); - final StoredMessage<MessageMetaData> storedMessage_0_8 = messageHandle_0_8.allContentAdded(); - long messageid_0_8 = storedMessage_0_8.getMessageNumber(); - ((AbstractBDBMessageStore.StoredBDBMessage)messageHandle_0_8).flushToStore(); - - /* - * Create and insert a 0-10 message (metadata and content) - */ - MessageProperties msgProps_0_10 = createMessageProperties_0_10(bodySize); - DeliveryProperties delProps_0_10 = createDeliveryProperties_0_10(); - Header header_0_10 = new Header(delProps_0_10, msgProps_0_10); - - MessageTransfer xfr_0_10 = new MessageTransfer("destination", MessageAcceptMode.EXPLICIT, - MessageAcquireMode.PRE_ACQUIRED, header_0_10, - Collections.singletonList(completeContentBody_0_10)); - - MessageMetaData_0_10 messageMetaData_0_10 = new MessageMetaData_0_10(xfr_0_10); - MessageHandle<MessageMetaData_0_10> messageHandle_0_10 = bdbStore.addMessage(messageMetaData_0_10); - - long origArrivalTime_0_10 = messageMetaData_0_10.getArrivalTime(); - - messageHandle_0_10.addContent(completeContentBody_0_10); - final StoredMessage<MessageMetaData_0_10> storedMessage_0_10 = messageHandle_0_10.allContentAdded(); - long messageid_0_10 = storedMessage_0_10.getMessageNumber(); - ((AbstractBDBMessageStore.StoredBDBMessage)messageHandle_0_10).flushToStore(); - - /* - * reload the store only (read-only) - */ - reopenStore(); - - /* - * Read back and validate the 0-8 message metadata and content - */ - BDBMessageStore reopenedBdbStore = (BDBMessageStore) getStore(); - StorableMessageMetaData storeableMMD_0_8 = reopenedBdbStore.getMessageMetaData(messageid_0_8); - - assertEquals("Unexpected message type", MessageMetaDataType_0_8.TYPE, storeableMMD_0_8.getType().ordinal()); - assertTrue("Unexpected instance type", storeableMMD_0_8 instanceof MessageMetaData); - MessageMetaData returnedMMD_0_8 = (MessageMetaData) storeableMMD_0_8; - - assertEquals("Message arrival time has changed", origArrivalTime_0_8, returnedMMD_0_8.getArrivalTime()); - - MessagePublishInfo returnedPubBody_0_8 = returnedMMD_0_8.getMessagePublishInfo(); - assertEquals("Message exchange has changed", pubInfoBody_0_8.getExchange(), returnedPubBody_0_8.getExchange()); - assertEquals("Immediate flag has changed", pubInfoBody_0_8.isImmediate(), returnedPubBody_0_8.isImmediate()); - assertEquals("Mandatory flag has changed", pubInfoBody_0_8.isMandatory(), returnedPubBody_0_8.isMandatory()); - assertEquals("Routing key has changed", pubInfoBody_0_8.getRoutingKey(), returnedPubBody_0_8.getRoutingKey()); - - ContentHeaderBody returnedHeaderBody_0_8 = returnedMMD_0_8.getContentHeaderBody(); - assertEquals("ContentHeader ClassID has changed", chb_0_8.getClassId(), returnedHeaderBody_0_8.getClassId()); - assertEquals("ContentHeader weight has changed", chb_0_8.getWeight(), returnedHeaderBody_0_8.getWeight()); - assertEquals("ContentHeader bodySize has changed", chb_0_8.getBodySize(), returnedHeaderBody_0_8.getBodySize()); - - BasicContentHeaderProperties returnedProperties_0_8 = returnedHeaderBody_0_8.getProperties(); - assertEquals("Property ContentType has changed", props_0_8.getContentTypeAsString(), returnedProperties_0_8.getContentTypeAsString()); - assertEquals("Property MessageID has changed", props_0_8.getMessageIdAsString(), returnedProperties_0_8.getMessageIdAsString()); - - ByteBuffer recoveredContent_0_8 = ByteBuffer.allocate((int) chb_0_8.getBodySize()) ; - long recoveredCount_0_8 = reopenedBdbStore.getContent(messageid_0_8, 0, recoveredContent_0_8); - assertEquals("Incorrect amount of payload data recovered", chb_0_8.getBodySize(), recoveredCount_0_8); - String returnedPayloadString_0_8 = new String(recoveredContent_0_8.array()); - assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_8); - - /* - * Read back and validate the 0-10 message metadata and content - */ - StorableMessageMetaData storeableMMD_0_10 = reopenedBdbStore.getMessageMetaData(messageid_0_10); - - assertEquals("Unexpected message type", MessageMetaDataType_0_10.TYPE, storeableMMD_0_10.getType().ordinal()); - assertTrue("Unexpected instance type", storeableMMD_0_10 instanceof MessageMetaData_0_10); - MessageMetaData_0_10 returnedMMD_0_10 = (MessageMetaData_0_10) storeableMMD_0_10; - - assertEquals("Message arrival time has changed", origArrivalTime_0_10, returnedMMD_0_10.getArrivalTime()); - - DeliveryProperties returnedDelProps_0_10 = returnedMMD_0_10.getHeader().getDeliveryProperties(); - assertNotNull("DeliveryProperties were not returned", returnedDelProps_0_10); - assertEquals("Immediate flag has changed", delProps_0_10.getImmediate(), returnedDelProps_0_10.getImmediate()); - assertEquals("Routing key has changed", delProps_0_10.getRoutingKey(), returnedDelProps_0_10.getRoutingKey()); - assertEquals("Message exchange has changed", delProps_0_10.getExchange(), returnedDelProps_0_10.getExchange()); - assertEquals("Message expiration has changed", delProps_0_10.getExpiration(), returnedDelProps_0_10.getExpiration()); - assertEquals("Message delivery priority has changed", delProps_0_10.getPriority(), returnedDelProps_0_10.getPriority()); - - MessageProperties returnedMsgProps = returnedMMD_0_10.getHeader().getMessageProperties(); - assertNotNull("MessageProperties were not returned", returnedMsgProps); - assertTrue("Message correlationID has changed", Arrays.equals(msgProps_0_10.getCorrelationId(), returnedMsgProps.getCorrelationId())); - assertEquals("Message content length has changed", msgProps_0_10.getContentLength(), returnedMsgProps.getContentLength()); - assertEquals("Message content type has changed", msgProps_0_10.getContentType(), returnedMsgProps.getContentType()); - - ByteBuffer recoveredContent = ByteBuffer.allocate((int) msgProps_0_10.getContentLength()) ; - long recoveredCount = reopenedBdbStore.getContent(messageid_0_10, 0, recoveredContent); - assertEquals("Incorrect amount of payload data recovered", msgProps_0_10.getContentLength(), recoveredCount); - - String returnedPayloadString_0_10 = new String(recoveredContent.array()); - assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_10); - - reopenedBdbStore.closeMessageStore(); - } - - private DeliveryProperties createDeliveryProperties_0_10() - { - DeliveryProperties delProps_0_10 = new DeliveryProperties(); - - delProps_0_10.setDeliveryMode(MessageDeliveryMode.PERSISTENT); - delProps_0_10.setImmediate(true); - delProps_0_10.setExchange("exchange12345"); - delProps_0_10.setRoutingKey("routingKey12345"); - delProps_0_10.setExpiration(5); - delProps_0_10.setPriority(MessageDeliveryPriority.ABOVE_AVERAGE); - - return delProps_0_10; - } - - private MessageProperties createMessageProperties_0_10(int bodySize) - { - MessageProperties msgProps_0_10 = new MessageProperties(); - msgProps_0_10.setContentLength(bodySize); - msgProps_0_10.setCorrelationId("qwerty".getBytes()); - msgProps_0_10.setContentType("text/html"); - - return msgProps_0_10; - } - - private MessagePublishInfo createPublishInfoBody_0_8() { return new MessagePublishInfo(new AMQShortString("exchange12345"), false, true, @@ -257,60 +84,6 @@ public class BDBMessageStoreTest extends MessageStoreTestCase return props; } - public void testGetContentWithOffset() throws Exception - { - BDBMessageStore bdbStore = (BDBMessageStore) getStore(); - StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(bdbStore); - long messageid_0_8 = storedMessage_0_8.getMessageNumber(); - - // normal case: offset is 0 - ByteBuffer dst = ByteBuffer.allocate(10); - int length = bdbStore.getContent(messageid_0_8, 0, dst); - assertEquals("Unexpected length", CONTENT_BYTES.length, length); - byte[] array = dst.array(); - assertTrue("Unexpected content", Arrays.equals(CONTENT_BYTES, array)); - - // offset is in the middle - dst = ByteBuffer.allocate(10); - length = bdbStore.getContent(messageid_0_8, 5, dst); - assertEquals("Unexpected length", 5, length); - array = dst.array(); - byte[] expected = new byte[10]; - System.arraycopy(CONTENT_BYTES, 5, expected, 0, 5); - assertTrue("Unexpected content", Arrays.equals(expected, array)); - - // offset beyond the content length - dst = ByteBuffer.allocate(10); - try - { - bdbStore.getContent(messageid_0_8, 15, dst); - fail("Should fail for the offset greater than message size"); - } - catch (RuntimeException e) - { - assertEquals("Unexpected exception message", "Offset 15 is greater than message size 10 for message id " - + messageid_0_8 + "!", e.getCause().getMessage()); - } - - // buffer is smaller then message size - dst = ByteBuffer.allocate(5); - length = bdbStore.getContent(messageid_0_8, 0, dst); - assertEquals("Unexpected length", 5, length); - array = dst.array(); - expected = new byte[5]; - System.arraycopy(CONTENT_BYTES, 0, expected, 0, 5); - assertTrue("Unexpected content", Arrays.equals(expected, array)); - - // buffer is smaller then message size, offset is not 0 - dst = ByteBuffer.allocate(5); - length = bdbStore.getContent(messageid_0_8, 2, dst); - assertEquals("Unexpected length", 5, length); - array = dst.array(); - expected = new byte[5]; - System.arraycopy(CONTENT_BYTES, 2, expected, 0, 5); - assertTrue("Unexpected content", Arrays.equals(expected, array)); - } - /** * Tests that messages which are added to the store and then removed using the * public MessageStore interfaces are actually removed from the store by then @@ -338,11 +111,15 @@ public class BDBMessageStoreTest extends MessageStoreTestCase // pass since exception expected } - //expecting no content, allocate a 1 byte - ByteBuffer dst = ByteBuffer.allocate(1); - - assertEquals("Retrieved content when none was expected", - 0, bdbStore.getContent(messageid_0_8, 0, dst)); + try + { + bdbStore.getAllContent(messageid_0_8); + fail("Expected exception not thrown"); + } + catch (StoreException se) + { + // PASS + } } private StoredMessage<MessageMetaData> createAndStoreSingleChunkMessage_0_8(MessageStore store) http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/NonPooledByteBufferRef.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/NonPooledByteBufferRef.java b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/NonPooledByteBufferRef.java index 2d0d668..27ad899 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/NonPooledByteBufferRef.java +++ b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/NonPooledByteBufferRef.java @@ -28,6 +28,10 @@ class NonPooledByteBufferRef implements ByteBufferRef NonPooledByteBufferRef(final ByteBuffer buffer) { + if (buffer == null) + { + throw new NullPointerException(); + } _buffer = buffer; } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/660c206d/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/PooledByteBufferRef.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/PooledByteBufferRef.java b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/PooledByteBufferRef.java index 01ad373..0126e20 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/PooledByteBufferRef.java +++ b/broker-core/src/main/java/org/apache/qpid/server/bytebuffer/PooledByteBufferRef.java @@ -21,6 +21,8 @@ package org.apache.qpid.server.bytebuffer; import java.nio.ByteBuffer; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLong; @@ -43,6 +45,10 @@ class PooledByteBufferRef implements ByteBufferRef PooledByteBufferRef(final ByteBuffer buffer) { + if (buffer == null) + { + throw new NullPointerException(); + } _buffer = buffer; ACTIVE_BUFFERS.incrementAndGet(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org