Author: rgodfrey Date: Sat Aug 8 18:26:14 2015 New Revision: 1694835 URL: http://svn.apache.org/r1694835 Log: QPID-6662 : Prevent the underlying ByteBuffer from escaping, unless the buffer is also marked as no longer eligible for a pool
Removed: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractNonBlockingConnectionDelegate.java Modified: qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionDelegate.java qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/ByteBufferRef.java qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/NonPooledByteBufferRef.java qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java qpid/java/trunk/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java Modified: qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java?rev=1694835&r1=1694834&r2=1694835&view=diff ============================================================================== --- qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java (original) +++ qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java Sat Aug 8 18:26:14 2015 @@ -124,7 +124,7 @@ public class BDBMessageStoreTest extends Header header_0_10 = new Header(delProps_0_10, msgProps_0_10); MessageTransfer xfr_0_10 = new MessageTransfer("destination", MessageAcceptMode.EXPLICIT, - MessageAcquireMode.PRE_ACQUIRED, header_0_10, completeContentBody_0_10.getNativeBuffer()); + MessageAcquireMode.PRE_ACQUIRED, header_0_10, completeContentBody_0_10.asByteBuffer()); MessageMetaData_0_10 messageMetaData_0_10 = new MessageMetaData_0_10(xfr_0_10); MessageHandle<MessageMetaData_0_10> messageHandle_0_10 = bdbStore.addMessage(messageMetaData_0_10); Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1694835&r1=1694834&r2=1694835&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java Sat Aug 8 18:26:14 2015 @@ -50,7 +50,7 @@ public class NonBlockingConnection imple private final SocketChannel _socketChannel; private NonBlockingConnectionDelegate _delegate; private NetworkConnectionScheduler _scheduler; - private final ConcurrentLinkedQueue<ByteBuffer> _buffers = new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue<QpidByteBuffer> _buffers = new ConcurrentLinkedQueue<>(); private final String _remoteSocketAddress; private final AtomicBoolean _closed = new AtomicBoolean(false); @@ -346,39 +346,37 @@ public class NonBlockingConnection imple } } - void writeToTransport(ByteBuffer[] buffers) throws IOException + long writeToTransport(Collection<QpidByteBuffer> buffers) throws IOException { - long written = _socketChannel.write(buffers); + long written = QpidByteBuffer.write(_socketChannel, buffers); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Written " + written + " bytes"); } + return written; } private boolean doWrite() throws IOException { - ByteBuffer[] bufArray = new ByteBuffer[_buffers.size()]; - Iterator<ByteBuffer> bufferIterator = _buffers.iterator(); - for (int i = 0; i < bufArray.length; i++) + final boolean result = _delegate.doWrite(_buffers); + while(!_buffers.isEmpty()) { - bufArray[i] = bufferIterator.next(); + QpidByteBuffer buf = _buffers.peek(); + if(buf.hasRemaining()) + { + break; + } + _buffers.poll(); } + return result; - if (_delegate != null) - { - return _delegate.doWrite(bufArray); - } - else - { - return true; - } } protected int readFromNetwork() throws IOException { QpidByteBuffer buffer = _delegate.getNetInputBuffer(); - int read = _socketChannel.read(buffer.getNativeBuffer()); + int read = buffer.read(_socketChannel); if (read == -1) { _closed.set(true); @@ -403,16 +401,10 @@ public class NonBlockingConnection imple } else if (msg.remaining() > 0) { - _buffers.add(msg.getNativeBuffer()); + _buffers.add(msg); } } - - public void writeBufferProcessed() - { - _buffers.poll(); - } - @Override public void flush() { Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionDelegate.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionDelegate.java?rev=1694835&r1=1694834&r2=1694835&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionDelegate.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionDelegate.java Sat Aug 8 18:26:14 2015 @@ -23,12 +23,13 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.security.Principal; import java.security.cert.Certificate; +import java.util.Collection; import org.apache.qpid.bytebuffer.QpidByteBuffer; interface NonBlockingConnectionDelegate { - boolean doWrite(ByteBuffer[] bufferArray) throws IOException; + boolean doWrite(Collection<QpidByteBuffer> bufferArray) throws IOException; boolean readyForRead(); @@ -42,5 +43,4 @@ interface NonBlockingConnectionDelegate QpidByteBuffer getNetInputBuffer(); - void setNetInputBuffer(QpidByteBuffer buffer); } Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java?rev=1694835&r1=1694834&r2=1694835&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java Sat Aug 8 18:26:14 2015 @@ -23,6 +23,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.security.Principal; import java.security.cert.Certificate; +import java.util.Collection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,26 +83,17 @@ public class NonBlockingConnectionPlainD @Override - public boolean doWrite(ByteBuffer[] bufferArray) throws IOException + public boolean doWrite(Collection<QpidByteBuffer> bufferArray) throws IOException { - int byteBuffersWritten = 0; + long bytesToWrite = 0l; - _parent.writeToTransport(bufferArray); - - for (ByteBuffer buf : bufferArray) + for(QpidByteBuffer buf : bufferArray) { - if (buf.remaining() == 0) - { - byteBuffersWritten++; - _parent.writeBufferProcessed(); - } - else - { - break; - } + bytesToWrite += buf.remaining(); } + final long actualWritten = _parent.writeToTransport(bufferArray); + return actualWritten >= bytesToWrite; - return bufferArray.length == byteBuffersWritten; } @Override @@ -128,9 +120,4 @@ public class NonBlockingConnectionPlainD return _netInputBuffer; } - @Override - public void setNetInputBuffer(final QpidByteBuffer netInputBuffer) - { - _netInputBuffer = netInputBuffer; - } } Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java?rev=1694835&r1=1694834&r2=1694835&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java Sat Aug 8 18:26:14 2015 @@ -30,10 +30,11 @@ import javax.net.ssl.SSLEngineResult; import javax.net.ssl.SSLException; import javax.net.ssl.SSLPeerUnverifiedException; import java.io.IOException; -import java.nio.ByteBuffer; import java.security.Principal; import java.security.cert.Certificate; import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; import java.util.List; import java.util.ListIterator; @@ -45,7 +46,7 @@ public class NonBlockingConnectionTLSDel private final NonBlockingConnection _parent; private final int _initialApplicationBufferSize; private SSLEngineResult _status; - private final List<ByteBuffer> _encryptedOutput = new ArrayList<>(); + private final List<QpidByteBuffer> _encryptedOutput = new ArrayList<>(); private Principal _principal; private Certificate _peerCertificate; private boolean _principalChecked; @@ -84,7 +85,7 @@ public class NonBlockingConnectionTLSDel int oldAppBufPos = _applicationBuffer.position(); oldNetBufferPos = _netInputBuffer.position(); - _status = _sslEngine.unwrap(_netInputBuffer.getNativeBuffer(), _applicationBuffer.getNativeBuffer()); + _status = _netInputBuffer.decryptSSL(_sslEngine, _applicationBuffer); if (_status.getStatus() == SSLEngineResult.Status.CLOSED) { // KW If SSLEngine changes state to CLOSED, what will ever set _closed to true? @@ -117,18 +118,17 @@ public class NonBlockingConnectionTLSDel } @Override - public boolean doWrite(ByteBuffer[] bufferArray) throws IOException + public boolean doWrite(Collection<QpidByteBuffer> bufferArray) throws IOException { + final int bufCount = bufferArray.size(); int byteBuffersWritten = wrapBufferArray(bufferArray); - ByteBuffer[] encryptedBuffers = _encryptedOutput.toArray(new ByteBuffer[_encryptedOutput.size()]); + _parent.writeToTransport(_encryptedOutput); - _parent.writeToTransport(encryptedBuffers); - - ListIterator<ByteBuffer> iter = _encryptedOutput.listIterator(); + ListIterator<QpidByteBuffer> iter = _encryptedOutput.listIterator(); while(iter.hasNext()) { - ByteBuffer buf = iter.next(); + QpidByteBuffer buf = iter.next(); if(buf.remaining() == 0) { iter.remove(); @@ -139,7 +139,7 @@ public class NonBlockingConnectionTLSDel } } - return (bufferArray.length == byteBuffersWritten) && _encryptedOutput.isEmpty(); + return (bufCount <= byteBuffersWritten) && _encryptedOutput.isEmpty(); } protected void restoreApplicationBufferForWrite() @@ -163,7 +163,7 @@ public class NonBlockingConnectionTLSDel } - private int wrapBufferArray(final ByteBuffer[] bufferArray) throws SSLException + private int wrapBufferArray(Collection<QpidByteBuffer> bufferArray) throws SSLException { int byteBuffersWritten = 0; int remaining = 0; @@ -171,8 +171,8 @@ public class NonBlockingConnectionTLSDel { if(_sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP) { - final ByteBuffer netBuffer = ByteBuffer.allocateDirect(_sslEngine.getSession().getPacketBufferSize()); - _status = _sslEngine.wrap(bufferArray, netBuffer); + final QpidByteBuffer netBuffer = QpidByteBuffer.allocateDirect(_sslEngine.getSession().getPacketBufferSize()); + _status = QpidByteBuffer.encryptSSL(_sslEngine,bufferArray, netBuffer); runSSLEngineTasks(_status); netBuffer.flip(); @@ -181,12 +181,14 @@ public class NonBlockingConnectionTLSDel { _encryptedOutput.add(netBuffer); } - for (ByteBuffer buf : bufferArray) + Iterator<QpidByteBuffer> iter = bufferArray.iterator(); + while (iter.hasNext()) { + QpidByteBuffer buf = iter.next(); if (buf.remaining() == 0) { byteBuffersWritten++; - _parent.writeBufferProcessed(); + iter.remove(); } else { @@ -284,9 +286,4 @@ public class NonBlockingConnectionTLSDel return _netInputBuffer; } - @Override - public void setNetInputBuffer(final QpidByteBuffer netInputBuffer) - { - _netInputBuffer = netInputBuffer; - } } Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java?rev=1694835&r1=1694834&r2=1694835&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java Sat Aug 8 18:26:14 2015 @@ -26,6 +26,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.security.Principal; import java.security.cert.Certificate; +import java.util.Collection; public class NonBlockingConnectionUndecidedDelegate implements NonBlockingConnectionDelegate { @@ -73,7 +74,7 @@ public class NonBlockingConnectionUndeci } @Override - public boolean doWrite(ByteBuffer[] bufferArray) throws IOException + public boolean doWrite(Collection<QpidByteBuffer> buffers) throws IOException { return true; } @@ -128,9 +129,4 @@ public class NonBlockingConnectionUndeci return _netInputBuffer; } - @Override - public void setNetInputBuffer(final QpidByteBuffer netInputBuffer) - { - _netInputBuffer = netInputBuffer; - } } Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java?rev=1694835&r1=1694834&r2=1694835&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java Sat Aug 8 18:26:14 2015 @@ -143,7 +143,7 @@ public class ProtocolEngine_1_0_0Test ex @Override public Object answer(final InvocationOnMock invocation) throws Throwable { - _sentBuffers.add(byteBufferCaptor.getValue().getNativeBuffer()); + _sentBuffers.add(byteBufferCaptor.getValue().asByteBuffer()); return null; } }).when(sender).send(byteBufferCaptor.capture()); @@ -249,7 +249,8 @@ public class ProtocolEngine_1_0_0Test ex createEngine(useSASL, Transport.TCP); - _protocolEngine_1_0_0.received(QpidByteBuffer.wrap(ProtocolEngineCreator_1_0_0_SASL.getInstance().getHeaderIdentifier())); + _protocolEngine_1_0_0.received(QpidByteBuffer.wrap(ProtocolEngineCreator_1_0_0_SASL.getInstance() + .getHeaderIdentifier())); SaslInit init = new SaslInit(); init.setMechanism(Symbol.valueOf("ANONYMOUS")); @@ -260,7 +261,8 @@ public class ProtocolEngine_1_0_0Test ex buf.flip(); _protocolEngine_1_0_0.received(buf); - _protocolEngine_1_0_0.received(QpidByteBuffer.wrap(ProtocolEngineCreator_1_0_0.getInstance().getHeaderIdentifier())); + _protocolEngine_1_0_0.received(QpidByteBuffer.wrap(ProtocolEngineCreator_1_0_0.getInstance() + .getHeaderIdentifier())); Open open = new Open(); _frameWriter.setValue(AMQFrame.createAMQFrame((short)0,open)); Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?rev=1694835&r1=1694834&r2=1694835&view=diff ============================================================================== --- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java (original) +++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java Sat Aug 8 18:26:14 2015 @@ -78,7 +78,7 @@ public abstract class AbstractJMSMessage _logger.debug("Non-fragmented message body (bodySize=" + contentHeader.getBodySize() + ")"); } - data = ((ContentBody) bodies.get(0)).getPayload().getNativeBuffer().duplicate(); + data = ((ContentBody) bodies.get(0)).getPayload().asByteBuffer().duplicate(); } else if (bodies != null) { @@ -93,7 +93,7 @@ public abstract class AbstractJMSMessage while (it.hasNext()) { ContentBody cb = (ContentBody) it.next(); - final ByteBuffer payload = cb.getPayload().getNativeBuffer().duplicate(); + final ByteBuffer payload = cb.getPayload().asByteBuffer().duplicate(); if (payload.isDirect() || payload.isReadOnly()) { data.put(payload); Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/ByteBufferRef.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/ByteBufferRef.java?rev=1694835&r1=1694834&r2=1694835&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/ByteBufferRef.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/ByteBufferRef.java Sat Aug 8 18:26:14 2015 @@ -29,4 +29,6 @@ public interface ByteBufferRef void decrementRef(); ByteBuffer getBuffer(); + + void removeFromPool(); } Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/NonPooledByteBufferRef.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/NonPooledByteBufferRef.java?rev=1694835&r1=1694834&r2=1694835&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/NonPooledByteBufferRef.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/NonPooledByteBufferRef.java Sat Aug 8 18:26:14 2015 @@ -48,4 +48,10 @@ class NonPooledByteBufferRef implements { return _buffer; } + + @Override + public void removeFromPool() + { + + } } Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java?rev=1694835&r1=1694834&r2=1694835&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java Sat Aug 8 18:26:14 2015 @@ -25,7 +25,7 @@ import java.util.concurrent.atomic.Atomi class PooledByteBufferRef implements ByteBufferRef { - private static final AtomicIntegerFieldUpdater<PooledByteBufferRef> UPDATER = AtomicIntegerFieldUpdater.newUpdater(PooledByteBufferRef.class, "_refCount"); + private static final AtomicIntegerFieldUpdater<PooledByteBufferRef> REF_COUNT = AtomicIntegerFieldUpdater.newUpdater(PooledByteBufferRef.class, "_refCount"); private final ByteBuffer _buffer; private volatile int _refCount; @@ -38,13 +38,17 @@ class PooledByteBufferRef implements Byt @Override public void incrementRef() { - UPDATER.incrementAndGet(this); + + if(REF_COUNT.get(this) >= 0) + { + REF_COUNT.incrementAndGet(this); + } } @Override public void decrementRef() { - if(UPDATER.decrementAndGet(this) == 0) + if(REF_COUNT.get(this) > 0 && REF_COUNT.decrementAndGet(this) == 0) { returnToPool(this); } @@ -56,9 +60,14 @@ class PooledByteBufferRef implements Byt return _buffer.duplicate(); } - private void returnToPool(final PooledByteBufferRef byteBufferRef) + @Override + public void removeFromPool() { + REF_COUNT.set(this, Integer.MIN_VALUE/2); + } + private void returnToPool(final PooledByteBufferRef byteBufferRef) + { } } Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java?rev=1694835&r1=1694834&r2=1694835&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java Sat Aug 8 18:26:14 2015 @@ -25,9 +25,17 @@ import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; import java.nio.CharBuffer; +import java.nio.channels.GatheringByteChannel; +import java.nio.channels.ReadableByteChannel; import java.nio.charset.Charset; +import java.util.Collection; +import java.util.Iterator; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLEngineResult; +import javax.net.ssl.SSLException; + import org.apache.qpid.codec.MarkableDataInput; import org.apache.qpid.framing.AMQShortString; @@ -417,8 +425,9 @@ public final class QpidByteBuffer } - public ByteBuffer getNativeBuffer() + public ByteBuffer asByteBuffer() { + _ref.removeFromPool(); return _buffer; } @@ -427,6 +436,43 @@ public final class QpidByteBuffer return charset.decode(_buffer); } + public int read(ReadableByteChannel channel) throws IOException + { + return channel.read(_buffer); + } + + + public SSLEngineResult decryptSSL(SSLEngine engine, QpidByteBuffer dest) throws SSLException + { + return engine.unwrap(_buffer, dest._buffer); + } + + + public static SSLEngineResult encryptSSL(SSLEngine engine, + final Collection<QpidByteBuffer> buffers, + QpidByteBuffer dest) throws SSLException + { + final ByteBuffer[] src = new ByteBuffer[buffers.size()]; + Iterator<QpidByteBuffer> iter = buffers.iterator(); + for(int i = 0; i<src.length; i++) + { + src[i] = iter.next()._buffer; + } + return engine.wrap(src, dest._buffer); + } + + + public static long write(GatheringByteChannel channel, Collection<QpidByteBuffer> buffers) throws IOException + { + ByteBuffer[] bufs = new ByteBuffer[buffers.size()]; + Iterator<QpidByteBuffer> bufIter = buffers.iterator(); + for(int i = 0; i < bufs.length; i++) + { + bufs[i] = bufIter.next()._buffer; + } + return channel.write(bufs); + } + public static QpidByteBuffer wrap(final ByteBuffer wrap) { return new QpidByteBuffer(new NonPooledByteBufferRef(wrap)); Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java?rev=1694835&r1=1694834&r2=1694835&view=diff ============================================================================== --- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java (original) +++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java Sat Aug 8 18:26:14 2015 @@ -156,7 +156,7 @@ public class SSLSender implements ByteBu int read = 0; try { - SSLEngineResult result = engine.wrap(appData.getNativeBuffer(), netData); + SSLEngineResult result = engine.wrap(appData.asByteBuffer(), netData); read = result.bytesProduced(); status = result.getStatus(); handshakeStatus = result.getHandshakeStatus(); Modified: qpid/java/trunk/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java?rev=1694835&r1=1694834&r2=1694835&view=diff ============================================================================== --- qpid/java/trunk/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java (original) +++ qpid/java/trunk/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java Sat Aug 8 18:26:14 2015 @@ -97,7 +97,7 @@ public class AMQDecoderTest extends Qpid { assertEquals(ContentBody.TYPE, ((AMQFrame) frames.get(0)).getBodyFrame().getFrameType()); ContentBody decodedBody = (ContentBody) ((AMQFrame) frames.get(0)).getBodyFrame(); - final ByteBuffer byteBuffer = decodedBody.getPayload().getNativeBuffer().duplicate(); + final ByteBuffer byteBuffer = decodedBody.getPayload().asByteBuffer().duplicate(); byte[] bodyBytes = new byte[byteBuffer.remaining()]; byteBuffer.get(bodyBytes); assertTrue("Body was corrupted", Arrays.equals(payload, bodyBytes)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org