This is an automated email from the ASF dual-hosted git repository. tabish pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push: new 10747f4a PROTON-2564 Reduece memory alloactions on send and receive paths 10747f4a is described below commit 10747f4a58819a29ba5fdcc09755b3f0e1440fd3 Author: Timothy Bish <tabish...@gmail.com> AuthorDate: Tue Jun 14 17:58:27 2022 -0400 PROTON-2564 Reduece memory alloactions on send and receive paths Reduces the amount of memory allocations in the engine and client on both the send and receive paths during normal operations. --- .../protonj2/client/impl/ClientMessageSupport.java | 9 ++- .../protonj2/client/util/FifoDeliveryQueue.java | 75 +++++-------------- .../protonj2/client/impl/ClientMessageTest.java | 1 + .../codec/decoders/messaging/DataTypeDecoder.java | 6 +- .../ApplicationPropertiesTypeEncoder.java | 12 ++- .../codec/encoders/messaging/DataTypeEncoder.java | 9 +-- .../messaging/DeliveryAnnotationsTypeEncoder.java | 12 ++- .../encoders/messaging/FooterTypeEncoder.java | 12 ++- .../messaging/MessageAnnotationsTypeEncoder.java | 12 ++- .../codec/encoders/primitives/MapTypeEncoder.java | 22 ++---- .../apache/qpid/protonj2/types/messaging/Data.java | 86 ++++++++++++++++------ .../qpid/protonj2/codec/benchmark/Benchmark.java | 7 +- .../codec/messaging/DataTypeCodecTest.java | 2 +- .../qpid/protonj2/types/messaging/DataTest.java | 52 +++++++++++++ 14 files changed, 182 insertions(+), 135 deletions(-) diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientMessageSupport.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientMessageSupport.java index a7cba17d..cf4ad91f 100644 --- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientMessageSupport.java +++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientMessageSupport.java @@ -54,6 +54,11 @@ public abstract class ClientMessageSupport { private static final Encoder DEFAULT_ENCODER = CodecFactory.getDefaultEncoder(); private static final Decoder DEFAULT_DECODER = CodecFactory.getDefaultDecoder(); + private static final ThreadLocal<EncoderState> THREAD_LOCAL_ENCODER_STATE = + ThreadLocal.withInitial(() -> DEFAULT_ENCODER.newEncoderState()); + private static final ThreadLocal<DecoderState> THREAD_LOCAL_DECODER_STATE = + ThreadLocal.withInitial(() -> DEFAULT_DECODER.newDecoderState()); + //----- Message Conversion /** @@ -92,7 +97,7 @@ public abstract class ClientMessageSupport { //----- Message Encoding public static ProtonBuffer encodeMessage(AdvancedMessage<?> message, Map<String, Object> deliveryAnnotations) throws ClientException { - return encodeMessage(DEFAULT_ENCODER, DEFAULT_ENCODER.newEncoderState(), ProtonByteBufferAllocator.DEFAULT, message, deliveryAnnotations); + return encodeMessage(DEFAULT_ENCODER, THREAD_LOCAL_ENCODER_STATE.get(), ProtonByteBufferAllocator.DEFAULT, message, deliveryAnnotations); } public static ProtonBuffer encodeMessage(Encoder encoder, ProtonBufferAllocator allocator, AdvancedMessage<?> message, Map<String, Object> deliveryAnnotations) throws ClientException { @@ -136,7 +141,7 @@ public abstract class ClientMessageSupport { //----- Message Decoding public static Message<?> decodeMessage(ProtonBuffer buffer, Consumer<DeliveryAnnotations> daConsumer) throws ClientException { - return decodeMessage(DEFAULT_DECODER, DEFAULT_DECODER.newDecoderState(), buffer, daConsumer); + return decodeMessage(DEFAULT_DECODER, THREAD_LOCAL_DECODER_STATE.get(), buffer, daConsumer); } public static Message<?> decodeMessage(Decoder decoder, ProtonBuffer buffer, Consumer<DeliveryAnnotations> daConsumer) throws ClientException { diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/util/FifoDeliveryQueue.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/util/FifoDeliveryQueue.java index 634b1ec6..91fbaebe 100644 --- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/util/FifoDeliveryQueue.java +++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/util/FifoDeliveryQueue.java @@ -20,8 +20,6 @@ import java.util.ArrayDeque; import java.util.Deque; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; import org.apache.qpid.protonj2.client.Delivery; import org.apache.qpid.protonj2.client.impl.ClientDelivery; @@ -40,9 +38,6 @@ public final class FifoDeliveryQueue implements DeliveryQueue { private volatile int state = STOPPED; - private final ReentrantLock lock = new ReentrantLock(); - private final Condition condition = lock.newCondition(); - private final Deque<ClientDelivery> queue; /** @@ -57,38 +52,30 @@ public final class FifoDeliveryQueue implements DeliveryQueue { @Override public void enqueueFirst(ClientDelivery envelope) { - lock.lock(); - try { + synchronized (queue) { queue.addFirst(envelope); - condition.signal(); - } finally { - lock.unlock(); + queue.notify(); } } @Override public void enqueue(ClientDelivery envelope) { - lock.lock(); - try { + synchronized (queue) { queue.addLast(envelope); - condition.signal(); - } finally { - lock.unlock(); + queue.notify(); } } - @Override public ClientDelivery dequeue(long timeout) throws InterruptedException { - lock.lock(); - try { + synchronized (queue) { // Wait until the receiver is ready to deliver messages. while (timeout != 0 && isRunning() && queue.isEmpty()) { if (timeout == -1) { - condition.await(); + queue.wait(); } else { long start = System.currentTimeMillis(); - condition.await(timeout, TimeUnit.MILLISECONDS); + queue.wait(TimeUnit.MILLISECONDS.toMillis(timeout)); timeout = Math.max(timeout + start - System.currentTimeMillis(), 0); } } @@ -98,33 +85,25 @@ public final class FifoDeliveryQueue implements DeliveryQueue { } return queue.pollFirst(); - } finally { - lock.unlock(); } } @Override public ClientDelivery dequeueNoWait() { - lock.lock(); - try { + synchronized (queue) { if (!isRunning()) { return null; } return queue.pollFirst(); - } finally { - lock.unlock(); } } @Override public void start() { if (STATE_FIELD_UPDATER.compareAndSet(this, STOPPED, RUNNING)) { - lock.lock(); - try { - condition.signalAll(); - } finally { - lock.unlock(); + synchronized (queue) { + queue.notifyAll(); } } } @@ -132,11 +111,8 @@ public final class FifoDeliveryQueue implements DeliveryQueue { @Override public void stop() { if (STATE_FIELD_UPDATER.compareAndSet(this, RUNNING, STOPPED)) { - lock.lock(); - try { - condition.signalAll(); - } finally { - lock.unlock(); + synchronized (queue) { + queue.notifyAll(); } } } @@ -144,11 +120,8 @@ public final class FifoDeliveryQueue implements DeliveryQueue { @Override public void close() { if (STATE_FIELD_UPDATER.getAndSet(this, CLOSED) > CLOSED) { - lock.lock(); - try { - condition.signalAll(); - } finally { - lock.unlock(); + synchronized (queue) { + queue.notifyAll(); } } } @@ -165,41 +138,29 @@ public final class FifoDeliveryQueue implements DeliveryQueue { @Override public boolean isEmpty() { - lock.lock(); - try { + synchronized (queue) { return queue.isEmpty(); - } finally { - lock.unlock(); } } @Override public int size() { - lock.lock(); - try { + synchronized (queue) { return queue.size(); - } finally { - lock.unlock(); } } @Override public void clear() { - lock.lock(); - try { + synchronized (queue) { queue.clear(); - } finally { - lock.unlock(); } } @Override public String toString() { - lock.lock(); - try { + synchronized (queue) { return queue.toString(); - } finally { - lock.unlock(); } } } diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ClientMessageTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ClientMessageTest.java index 1a695b2c..c41a16f5 100644 --- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ClientMessageTest.java +++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ClientMessageTest.java @@ -366,6 +366,7 @@ class ClientMessageTest { message.bodySections().forEach(section -> { assertTrue(section instanceof Data); final Data dataView = (Data) section; + assertEquals(counter.get(), dataView.getBuffer().getArray()[0]); assertEquals(counter.getAndIncrement(), dataView.getBinary().getArray()[0]); }); } diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/decoders/messaging/DataTypeDecoder.java b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/decoders/messaging/DataTypeDecoder.java index 5b274670..b0221db5 100644 --- a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/decoders/messaging/DataTypeDecoder.java +++ b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/decoders/messaging/DataTypeDecoder.java @@ -39,7 +39,7 @@ import org.apache.qpid.protonj2.types.messaging.Data; */ public final class DataTypeDecoder extends AbstractDescribedTypeDecoder<Data> { - private static final Data EMPTY_DATA = new Data((Binary) null); + private static final Data EMPTY_DATA = new Data((ProtonBuffer) null); @Override public Class<Data> getTypeClass() { @@ -86,7 +86,7 @@ public final class DataTypeDecoder extends AbstractDescribedTypeDecoder<Data> { data.setWriteIndex(size); buffer.setReadIndex(position + size); - return new Data(new Binary(data)); + return new Data(data); } @Override @@ -130,7 +130,7 @@ public final class DataTypeDecoder extends AbstractDescribedTypeDecoder<Data> { throw new DecodeException("Expected Binary type but found encoding: " + encodingCode); } - return new Data(new Binary(ProtonStreamUtils.readBytes(stream, size))); + return new Data(ProtonStreamUtils.readBytes(stream, size)); } @Override diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/ApplicationPropertiesTypeEncoder.java b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/ApplicationPropertiesTypeEncoder.java index 912a6ef9..f80b5626 100644 --- a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/ApplicationPropertiesTypeEncoder.java +++ b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/ApplicationPropertiesTypeEncoder.java @@ -16,8 +16,6 @@ */ package org.apache.qpid.protonj2.codec.encoders.messaging; -import java.util.Map; - import org.apache.qpid.protonj2.buffer.ProtonBuffer; import org.apache.qpid.protonj2.codec.EncoderState; import org.apache.qpid.protonj2.codec.encoders.AbstractDescribedMapTypeEncoder; @@ -60,11 +58,11 @@ public final class ApplicationPropertiesTypeEncoder extends AbstractDescribedMap } @Override - public void writeMapEntries(ProtonBuffer buffer, EncoderState state, ApplicationProperties value) { + public void writeMapEntries(ProtonBuffer buffer, EncoderState state, ApplicationProperties properties) { // Write the Map elements and then compute total size written. - for (Map.Entry<String, Object> entry : value.getValue().entrySet()) { - state.getEncoder().writeString(buffer, state, entry.getKey()); - state.getEncoder().writeObject(buffer, state, entry.getValue()); - } + properties.getValue().forEach((key, value) -> { + state.getEncoder().writeString(buffer, state, key); + state.getEncoder().writeObject(buffer, state, value); + }); } } diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/DataTypeEncoder.java b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/DataTypeEncoder.java index a6ec3284..8038fc6e 100644 --- a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/DataTypeEncoder.java +++ b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/DataTypeEncoder.java @@ -20,7 +20,6 @@ import org.apache.qpid.protonj2.buffer.ProtonBuffer; import org.apache.qpid.protonj2.codec.EncoderState; import org.apache.qpid.protonj2.codec.EncodingCodes; import org.apache.qpid.protonj2.codec.encoders.AbstractDescribedTypeEncoder; -import org.apache.qpid.protonj2.types.Binary; import org.apache.qpid.protonj2.types.Symbol; import org.apache.qpid.protonj2.types.UnsignedLong; import org.apache.qpid.protonj2.types.messaging.Data; @@ -51,7 +50,7 @@ public final class DataTypeEncoder extends AbstractDescribedTypeEncoder<Data> { buffer.writeByte(EncodingCodes.SMALLULONG); buffer.writeByte(Data.DESCRIPTOR_CODE.byteValue()); - state.getEncoder().writeBinary(buffer, state, value.getValue()); + state.getEncoder().writeBinary(buffer, state, value.getBuffer()); } @Override @@ -85,9 +84,9 @@ public final class DataTypeEncoder extends AbstractDescribedTypeEncoder<Data> { buffer.writeByte(EncodingCodes.VBIN32); for (Object value : values) { - final Binary binary = ((Data) value).getBinary(); - buffer.writeInt(binary.getLength()); - buffer.writeBytes(binary.getArray(), binary.getArrayOffset(), binary.getLength()); + final ProtonBuffer binary = ((Data) value).getBuffer(); + buffer.writeInt(binary.getReadableBytes()); + buffer.writeBytes(binary.getArray(), binary.getArrayOffset(), binary.getReadableBytes()); } } } diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/DeliveryAnnotationsTypeEncoder.java b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/DeliveryAnnotationsTypeEncoder.java index 774f8e6a..769a3ca6 100644 --- a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/DeliveryAnnotationsTypeEncoder.java +++ b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/DeliveryAnnotationsTypeEncoder.java @@ -16,8 +16,6 @@ */ package org.apache.qpid.protonj2.codec.encoders.messaging; -import java.util.Map; - import org.apache.qpid.protonj2.buffer.ProtonBuffer; import org.apache.qpid.protonj2.codec.EncoderState; import org.apache.qpid.protonj2.codec.encoders.AbstractDescribedMapTypeEncoder; @@ -60,11 +58,11 @@ public final class DeliveryAnnotationsTypeEncoder extends AbstractDescribedMapTy } @Override - public void writeMapEntries(ProtonBuffer buffer, EncoderState state, DeliveryAnnotations value) { + public void writeMapEntries(ProtonBuffer buffer, EncoderState state, DeliveryAnnotations annotations) { // Write the Map elements and then compute total size written. - for (Map.Entry<Symbol, Object> entry : value.getValue().entrySet()) { - state.getEncoder().writeSymbol(buffer, state, entry.getKey()); - state.getEncoder().writeObject(buffer, state, entry.getValue()); - } + annotations.getValue().forEach((key, value) -> { + state.getEncoder().writeSymbol(buffer, state, key); + state.getEncoder().writeObject(buffer, state, value); + }); } } diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/FooterTypeEncoder.java b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/FooterTypeEncoder.java index 5bb2fd29..c584e537 100644 --- a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/FooterTypeEncoder.java +++ b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/FooterTypeEncoder.java @@ -16,8 +16,6 @@ */ package org.apache.qpid.protonj2.codec.encoders.messaging; -import java.util.Map; - import org.apache.qpid.protonj2.buffer.ProtonBuffer; import org.apache.qpid.protonj2.codec.EncoderState; import org.apache.qpid.protonj2.codec.encoders.AbstractDescribedMapTypeEncoder; @@ -60,11 +58,11 @@ public final class FooterTypeEncoder extends AbstractDescribedMapTypeEncoder<Obj } @Override - public void writeMapEntries(ProtonBuffer buffer, EncoderState state, Footer value) { + public void writeMapEntries(ProtonBuffer buffer, EncoderState state, Footer footers) { // Write the Map elements and then compute total size written. - for (Map.Entry<Symbol, Object> entry : value.getValue().entrySet()) { - state.getEncoder().writeObject(buffer, state, entry.getKey()); - state.getEncoder().writeObject(buffer, state, entry.getValue()); - } + footers.getValue().forEach((key, value) -> { + state.getEncoder().writeObject(buffer, state, key); + state.getEncoder().writeObject(buffer, state, value); + }); } } diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/MessageAnnotationsTypeEncoder.java b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/MessageAnnotationsTypeEncoder.java index b4233aa7..02b692f1 100644 --- a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/MessageAnnotationsTypeEncoder.java +++ b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/MessageAnnotationsTypeEncoder.java @@ -16,8 +16,6 @@ */ package org.apache.qpid.protonj2.codec.encoders.messaging; -import java.util.Map; - import org.apache.qpid.protonj2.buffer.ProtonBuffer; import org.apache.qpid.protonj2.codec.EncoderState; import org.apache.qpid.protonj2.codec.encoders.AbstractDescribedMapTypeEncoder; @@ -60,11 +58,11 @@ public final class MessageAnnotationsTypeEncoder extends AbstractDescribedMapTyp } @Override - public void writeMapEntries(ProtonBuffer buffer, EncoderState state, MessageAnnotations value) { + public void writeMapEntries(ProtonBuffer buffer, EncoderState state, MessageAnnotations annotations) { // Write the Map elements and then compute total size written. - for (Map.Entry<Symbol, Object> entry : value.getValue().entrySet()) { - state.getEncoder().writeSymbol(buffer, state, entry.getKey()); - state.getEncoder().writeObject(buffer, state, entry.getValue()); - } + annotations.getValue().forEach((key, value) -> { + state.getEncoder().writeSymbol(buffer, state, key); + state.getEncoder().writeObject(buffer, state, value); + }); } } diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/primitives/MapTypeEncoder.java b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/primitives/MapTypeEncoder.java index 0f34276c..80360148 100644 --- a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/primitives/MapTypeEncoder.java +++ b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/primitives/MapTypeEncoder.java @@ -17,8 +17,6 @@ package org.apache.qpid.protonj2.codec.encoders.primitives; import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; import org.apache.qpid.protonj2.buffer.ProtonBuffer; import org.apache.qpid.protonj2.codec.EncodeException; @@ -62,25 +60,21 @@ public final class MapTypeEncoder extends AbstractPrimitiveTypeEncoder<Map> { buffer.writeInt(value.size() * 2); // Write the list elements and then compute total size written. - Set<Map.Entry> entries = value.entrySet(); - for (Entry entry : entries) { - Object entryKey = entry.getKey(); - Object entryValue = entry.getValue(); - - TypeEncoder keyEncoder = state.getEncoder().getTypeEncoder(entryKey); + value.forEach((key, entry) -> { + TypeEncoder keyEncoder = state.getEncoder().getTypeEncoder(key); if (keyEncoder == null) { - throw new EncodeException("Cannot find encoder for type " + entryKey); + throw new EncodeException("Cannot find encoder for type " + key); } - keyEncoder.writeType(buffer, state, entryKey); + keyEncoder.writeType(buffer, state, key); - TypeEncoder valueEncoder = state.getEncoder().getTypeEncoder(entryValue); + TypeEncoder valueEncoder = state.getEncoder().getTypeEncoder(entry); if (valueEncoder == null) { - throw new EncodeException("Cannot find encoder for type " + entryValue); + throw new EncodeException("Cannot find encoder for type " + entry); } - valueEncoder.writeType(buffer, state, entryValue); - } + valueEncoder.writeType(buffer, state, entry); + }); // Move back and write the size int endIndex = buffer.getWriteIndex(); diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/types/messaging/Data.java b/protonj2/src/main/java/org/apache/qpid/protonj2/types/messaging/Data.java index ff872ba7..1856828d 100644 --- a/protonj2/src/main/java/org/apache/qpid/protonj2/types/messaging/Data.java +++ b/protonj2/src/main/java/org/apache/qpid/protonj2/types/messaging/Data.java @@ -17,6 +17,7 @@ package org.apache.qpid.protonj2.types.messaging; import org.apache.qpid.protonj2.buffer.ProtonBuffer; +import org.apache.qpid.protonj2.buffer.ProtonByteBufferAllocator; import org.apache.qpid.protonj2.types.Binary; import org.apache.qpid.protonj2.types.Symbol; import org.apache.qpid.protonj2.types.UnsignedLong; @@ -26,30 +27,51 @@ public final class Data implements Section<byte[]> { public static final UnsignedLong DESCRIPTOR_CODE = UnsignedLong.valueOf(0x0000000000000075L); public static final Symbol DESCRIPTOR_SYMBOL = Symbol.valueOf("amqp:data:binary"); - private final Binary value; + private final ProtonBuffer buffer; - public Data(Binary value) { - this.value = value; + private Binary cachedBinary; + + public Data(Binary binary) { + this.buffer = binary != null ? binary.asProtonBuffer() : null; + this.cachedBinary = binary; } - public Data(ProtonBuffer value) { - this.value = value != null ? new Binary(value) : null; + public Data(ProtonBuffer buffer) { + this.buffer = buffer; } public Data(byte[] value) { - this.value = value != null ? new Binary(value) : null; + this.buffer = value != null ? ProtonByteBufferAllocator.DEFAULT.wrap(value) : null; } public Data(byte[] value, int offset, int length) { - this.value = value != null ? new Binary(value, offset, length) : null; + this.buffer = value != null ? ProtonByteBufferAllocator.DEFAULT.wrap(value, offset, length) : null; } public Data copy() { - return new Data(value == null ? null : value.copy()); + return new Data(buffer == null ? null : buffer.copy()); } public Binary getBinary() { - return value; + if (cachedBinary != null || buffer == null) { + return cachedBinary; + } else { + return cachedBinary = new Binary(buffer); + } + } + + /** + * Returns the {@link ProtonBuffer} that contains the bytes carried in the {@link Data} section. + * If the section carries no bytes then this method returns null. This method allows the {@link Data} + * section to be considered a carrier of {@link ProtonBuffer} types instead of the {@link Binary} + * value it will encode as part of its body and avoids creation of a Binary object when one is not + * needed. If a Binary instance is required then calling the {@link #getBinary()} method will create + * an instance that wraps the internal {@link ProtonBuffer}. + * + * @return the {@link ProtonBuffer} that back this Data section. + */ + public ProtonBuffer getBuffer() { + return buffer; } /** @@ -61,16 +83,42 @@ public final class Data implements Section<byte[]> { */ @Override public byte[] getValue() { - if (value != null && value.hasArray() && value.getArrayOffset() == 0 && value.getLength() == value.getArray().length) { - return value.getArray(); + if (buffer != null && buffer.hasArray() && buffer.getArrayOffset() == 0 && buffer.getReadableBytes() == buffer.getArray().length) { + return buffer.getArray(); } else { - return value != null ? value.arrayCopy() : null; + byte[] dataCopy = null; + if (buffer != null) { + dataCopy = new byte[buffer.getReadableBytes()]; + buffer.getBytes(buffer.getReadIndex(), dataCopy); + } + + return dataCopy; } } @Override public String toString() { - return "Data{ " + value + " }"; + if (buffer == null) { + return ""; + } + + StringBuilder str = new StringBuilder(); + + str.append("Data{ "); + + for (int i = 0; i < buffer.getReadableBytes(); i++) { + byte c = buffer.getByte(i); + + if (c > 31 && c < 127 && c != '\\') { + str.append((char) c); + } else { + str.append(String.format("\\x%02x", c)); + } + } + + str.append(" }"); + + return str.toString(); } @Override @@ -82,7 +130,7 @@ public final class Data implements Section<byte[]> { public int hashCode() { final int prime = 31; int result = 1; - result = prime * result + ((value == null) ? 0 : value.hashCode()); + result = prime * result + ((buffer == null) ? 0 : buffer.hashCode()); return result; } @@ -99,14 +147,10 @@ public final class Data implements Section<byte[]> { } Data other = (Data) obj; - if (value == null) { - if (other.value != null) { - return false; - } - } else if (!value.equals(other.value)) { - return false; + if (buffer == null) { + return other.buffer == null; } - return true; + return buffer.equals(other.buffer); } } diff --git a/protonj2/src/test/java/org/apache/qpid/protonj2/codec/benchmark/Benchmark.java b/protonj2/src/test/java/org/apache/qpid/protonj2/codec/benchmark/Benchmark.java index 5e4eff69..669690ab 100644 --- a/protonj2/src/test/java/org/apache/qpid/protonj2/codec/benchmark/Benchmark.java +++ b/protonj2/src/test/java/org/apache/qpid/protonj2/codec/benchmark/Benchmark.java @@ -34,7 +34,6 @@ import org.apache.qpid.protonj2.codec.Decoder; import org.apache.qpid.protonj2.codec.DecoderState; import org.apache.qpid.protonj2.codec.Encoder; import org.apache.qpid.protonj2.codec.EncoderState; -import org.apache.qpid.protonj2.types.Binary; import org.apache.qpid.protonj2.types.Symbol; import org.apache.qpid.protonj2.types.UnsignedByte; import org.apache.qpid.protonj2.types.UnsignedInteger; @@ -367,9 +366,9 @@ public class Benchmark implements Runnable { } private void benchmarkData() throws IOException { - Data data1 = new Data(new Binary(new byte[] {1, 2, 3})); - Data data2 = new Data(new Binary(new byte[] {4, 5, 6})); - Data data3 = new Data(new Binary(new byte[] {7, 8, 9})); + Data data1 = new Data(new byte[] {1, 2, 3}); + Data data2 = new Data(new byte[] {4, 5, 6}); + Data data3 = new Data(new byte[] {7, 8, 9}); resultSet.start(); for (int i = 0; i < ITERATIONS; i++) { diff --git a/protonj2/src/test/java/org/apache/qpid/protonj2/codec/messaging/DataTypeCodecTest.java b/protonj2/src/test/java/org/apache/qpid/protonj2/codec/messaging/DataTypeCodecTest.java index fd878ee5..5efa51a9 100644 --- a/protonj2/src/test/java/org/apache/qpid/protonj2/codec/messaging/DataTypeCodecTest.java +++ b/protonj2/src/test/java/org/apache/qpid/protonj2/codec/messaging/DataTypeCodecTest.java @@ -98,7 +98,7 @@ public class DataTypeCodecTest extends CodecTestSupport { ProtonBuffer buffer = ProtonByteBufferAllocator.DEFAULT.allocate(); InputStream stream = new ProtonBufferInputStream(buffer); - Data data = new Data(new Binary(new byte[] { 1, 2, 3})); + Data data = new Data(new byte[] { 1, 2, 3}); for (int i = 0; i < size; ++i) { encoder.writeObject(buffer, encoderState, data); diff --git a/protonj2/src/test/java/org/apache/qpid/protonj2/types/messaging/DataTest.java b/protonj2/src/test/java/org/apache/qpid/protonj2/types/messaging/DataTest.java index 4e1f2b7b..d57111a1 100644 --- a/protonj2/src/test/java/org/apache/qpid/protonj2/types/messaging/DataTest.java +++ b/protonj2/src/test/java/org/apache/qpid/protonj2/types/messaging/DataTest.java @@ -42,6 +42,11 @@ public class DataTest { assertNull(new Data((byte[]) null).getValue()); } + @Test + public void testCopyFromEmptyProtonBuffer() { + assertNull(new Data((ProtonBuffer) null).copy().getBinary()); + } + @Test public void testCopyFromEmpty() { assertNull(new Data((Binary) null).copy().getBinary()); @@ -78,6 +83,25 @@ public class DataTest { assertEquals(new Data((Binary) null).hashCode(), new Data((ProtonBuffer) null).hashCode()); } + @Test + public void testHashCodeWithProtonBuffer() { + byte[] bytes = new byte[] { 1 }; + Data data = new Data(bytes); + Data copy = data.copy(); + + assertNotNull(copy.getValue()); + assertNotSame(data.getValue(), copy.getValue()); + + assertEquals(data.hashCode(), copy.hashCode()); + + Data second = new Data(new byte[] { 1, 2, 3 }); + + assertNotEquals(data.hashCode(), second.hashCode()); + + assertNotEquals(new Data((ProtonBuffer) null).hashCode(), data.hashCode()); + assertEquals(new Data((ProtonBuffer) null).hashCode(), new Data((ProtonBuffer) null).hashCode()); + } + @Test public void testEquals() { byte[] bytes = new byte[] { 1 }; @@ -107,6 +131,34 @@ public class DataTest { assertEquals(new Data((Binary) null), new Data((ProtonBuffer) null)); } + @Test + public void testEqualsWithoutBinary() { + byte[] bytes = new byte[] { 1 }; + Data data = new Data(bytes); + Data copy = data.copy(); + + assertNotNull(copy.getValue()); + assertNotSame(data.getValue(), copy.getValue()); + + assertEquals(data, data); + assertEquals(data, copy); + + Data second = new Data(ProtonByteBufferAllocator.DEFAULT.wrap(new byte[] { 1, 2, 3 })); + Data third = new Data(new byte[] { 1, 2, 3 }, 0, 3); + Data fourth = new Data(new byte[] { 1, 2, 3 }, 0, 1); + Data fifth = new Data(null, 0, 0); + + assertNotEquals(data, second); + assertNotEquals(data, third); + assertNotEquals(data, fifth); + assertEquals(data, fourth); + assertFalse(data.equals(null)); + assertNotEquals(data, "not a data"); + assertNotEquals(data, new Data((byte[]) null)); + assertNotEquals(new Data((ProtonBuffer) null), data); + assertEquals(new Data((byte[]) null), new Data((ProtonBuffer) null)); + } + @Test public void testGetValueWhenUsingAnArrayView() { Data view = new Data(new byte[] { 1, 2, 3 }, 0, 1); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org