Author: rgodfrey
Date: Sat Aug  8 18:26:14 2015
New Revision: 1694835

URL: http://svn.apache.org/r1694835
Log:
QPID-6662 : Prevent the underlying ByteBuffer from escaping, unless the buffer 
is also marked as no longer eligible for a pool

Removed:
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractNonBlockingConnectionDelegate.java
Modified:
    
qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionDelegate.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/ByteBufferRef.java
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/NonPooledByteBufferRef.java
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
    
qpid/java/trunk/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java

Modified: 
qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java?rev=1694835&r1=1694834&r2=1694835&view=diff
==============================================================================
--- 
qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
 (original)
+++ 
qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
 Sat Aug  8 18:26:14 2015
@@ -124,7 +124,7 @@ public class BDBMessageStoreTest extends
         Header header_0_10 = new Header(delProps_0_10, msgProps_0_10);
 
         MessageTransfer xfr_0_10 = new MessageTransfer("destination", 
MessageAcceptMode.EXPLICIT,
-                MessageAcquireMode.PRE_ACQUIRED, header_0_10, 
completeContentBody_0_10.getNativeBuffer());
+                MessageAcquireMode.PRE_ACQUIRED, header_0_10, 
completeContentBody_0_10.asByteBuffer());
 
         MessageMetaData_0_10 messageMetaData_0_10 = new 
MessageMetaData_0_10(xfr_0_10);
         MessageHandle<MessageMetaData_0_10> messageHandle_0_10 = 
bdbStore.addMessage(messageMetaData_0_10);

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1694835&r1=1694834&r2=1694835&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
 Sat Aug  8 18:26:14 2015
@@ -50,7 +50,7 @@ public class NonBlockingConnection imple
     private final SocketChannel _socketChannel;
     private NonBlockingConnectionDelegate _delegate;
     private NetworkConnectionScheduler _scheduler;
-    private final ConcurrentLinkedQueue<ByteBuffer> _buffers = new 
ConcurrentLinkedQueue<>();
+    private final ConcurrentLinkedQueue<QpidByteBuffer> _buffers = new 
ConcurrentLinkedQueue<>();
 
     private final String _remoteSocketAddress;
     private final AtomicBoolean _closed = new AtomicBoolean(false);
@@ -346,39 +346,37 @@ public class NonBlockingConnection imple
         }
     }
 
-    void writeToTransport(ByteBuffer[] buffers) throws IOException
+    long writeToTransport(Collection<QpidByteBuffer> buffers) throws 
IOException
     {
-        long written  = _socketChannel.write(buffers);
+        long written  = QpidByteBuffer.write(_socketChannel, buffers);
         if (LOGGER.isDebugEnabled())
         {
             LOGGER.debug("Written " + written + " bytes");
         }
+        return written;
     }
 
     private boolean doWrite() throws IOException
     {
-        ByteBuffer[] bufArray = new ByteBuffer[_buffers.size()];
-        Iterator<ByteBuffer> bufferIterator = _buffers.iterator();
-        for (int i = 0; i < bufArray.length; i++)
+        final boolean result = _delegate.doWrite(_buffers);
+        while(!_buffers.isEmpty())
         {
-            bufArray[i] = bufferIterator.next();
+            QpidByteBuffer buf = _buffers.peek();
+            if(buf.hasRemaining())
+            {
+                break;
+            }
+            _buffers.poll();
         }
+        return result;
 
-        if (_delegate != null)
-        {
-            return _delegate.doWrite(bufArray);
-        }
-        else
-        {
-            return true;
-        }
     }
 
     protected int readFromNetwork() throws IOException
     {
         QpidByteBuffer buffer = _delegate.getNetInputBuffer();
 
-        int read = _socketChannel.read(buffer.getNativeBuffer());
+        int read = buffer.read(_socketChannel);
         if (read == -1)
         {
             _closed.set(true);
@@ -403,16 +401,10 @@ public class NonBlockingConnection imple
         }
         else if (msg.remaining() > 0)
         {
-            _buffers.add(msg.getNativeBuffer());
+            _buffers.add(msg);
         }
     }
 
-
-    public void writeBufferProcessed()
-    {
-        _buffers.poll();
-    }
-
     @Override
     public void flush()
     {

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionDelegate.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionDelegate.java?rev=1694835&r1=1694834&r2=1694835&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionDelegate.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionDelegate.java
 Sat Aug  8 18:26:14 2015
@@ -23,12 +23,13 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.security.Principal;
 import java.security.cert.Certificate;
+import java.util.Collection;
 
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
 
 interface NonBlockingConnectionDelegate
 {
-    boolean doWrite(ByteBuffer[] bufferArray) throws IOException;
+    boolean doWrite(Collection<QpidByteBuffer> bufferArray) throws IOException;
 
     boolean readyForRead();
 
@@ -42,5 +43,4 @@ interface NonBlockingConnectionDelegate
 
     QpidByteBuffer getNetInputBuffer();
 
-    void setNetInputBuffer(QpidByteBuffer buffer);
 }

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java?rev=1694835&r1=1694834&r2=1694835&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java
 Sat Aug  8 18:26:14 2015
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.security.Principal;
 import java.security.cert.Certificate;
+import java.util.Collection;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -82,26 +83,17 @@ public class NonBlockingConnectionPlainD
 
 
     @Override
-    public boolean doWrite(ByteBuffer[] bufferArray) throws IOException
+    public boolean doWrite(Collection<QpidByteBuffer> bufferArray) throws 
IOException
     {
-        int byteBuffersWritten = 0;
+        long bytesToWrite = 0l;
 
-        _parent.writeToTransport(bufferArray);
-
-        for (ByteBuffer buf : bufferArray)
+        for(QpidByteBuffer buf : bufferArray)
         {
-            if (buf.remaining() == 0)
-            {
-                byteBuffersWritten++;
-                _parent.writeBufferProcessed();
-            }
-            else
-            {
-                break;
-            }
+            bytesToWrite += buf.remaining();
         }
+        final long actualWritten = _parent.writeToTransport(bufferArray);
+        return actualWritten >= bytesToWrite;
 
-        return bufferArray.length == byteBuffersWritten;
     }
 
     @Override
@@ -128,9 +120,4 @@ public class NonBlockingConnectionPlainD
         return _netInputBuffer;
     }
 
-    @Override
-    public void setNetInputBuffer(final QpidByteBuffer netInputBuffer)
-    {
-        _netInputBuffer = netInputBuffer;
-    }
 }

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java?rev=1694835&r1=1694834&r2=1694835&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java
 Sat Aug  8 18:26:14 2015
@@ -30,10 +30,11 @@ import javax.net.ssl.SSLEngineResult;
 import javax.net.ssl.SSLException;
 import javax.net.ssl.SSLPeerUnverifiedException;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.security.Principal;
 import java.security.cert.Certificate;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 import java.util.ListIterator;
 
@@ -45,7 +46,7 @@ public class NonBlockingConnectionTLSDel
     private final NonBlockingConnection _parent;
     private final int _initialApplicationBufferSize;
     private SSLEngineResult _status;
-    private final List<ByteBuffer> _encryptedOutput = new ArrayList<>();
+    private final List<QpidByteBuffer> _encryptedOutput = new ArrayList<>();
     private Principal _principal;
     private Certificate _peerCertificate;
     private boolean _principalChecked;
@@ -84,7 +85,7 @@ public class NonBlockingConnectionTLSDel
             int oldAppBufPos = _applicationBuffer.position();
             oldNetBufferPos = _netInputBuffer.position();
 
-            _status = _sslEngine.unwrap(_netInputBuffer.getNativeBuffer(), 
_applicationBuffer.getNativeBuffer());
+            _status = _netInputBuffer.decryptSSL(_sslEngine, 
_applicationBuffer);
             if (_status.getStatus() == SSLEngineResult.Status.CLOSED)
             {
                 // KW If SSLEngine changes state to CLOSED, what will ever set 
_closed to true?
@@ -117,18 +118,17 @@ public class NonBlockingConnectionTLSDel
     }
 
     @Override
-    public boolean doWrite(ByteBuffer[] bufferArray) throws IOException
+    public boolean doWrite(Collection<QpidByteBuffer> bufferArray) throws 
IOException
     {
+        final int bufCount = bufferArray.size();
         int byteBuffersWritten = wrapBufferArray(bufferArray);
 
-        ByteBuffer[] encryptedBuffers = _encryptedOutput.toArray(new 
ByteBuffer[_encryptedOutput.size()]);
+        _parent.writeToTransport(_encryptedOutput);
 
-        _parent.writeToTransport(encryptedBuffers);
-
-        ListIterator<ByteBuffer> iter = _encryptedOutput.listIterator();
+        ListIterator<QpidByteBuffer> iter = _encryptedOutput.listIterator();
         while(iter.hasNext())
         {
-            ByteBuffer buf = iter.next();
+            QpidByteBuffer buf = iter.next();
             if(buf.remaining() == 0)
             {
                 iter.remove();
@@ -139,7 +139,7 @@ public class NonBlockingConnectionTLSDel
             }
         }
 
-        return (bufferArray.length == byteBuffersWritten) && 
_encryptedOutput.isEmpty();
+        return (bufCount <= byteBuffersWritten) && _encryptedOutput.isEmpty();
     }
 
     protected void restoreApplicationBufferForWrite()
@@ -163,7 +163,7 @@ public class NonBlockingConnectionTLSDel
 
     }
 
-    private int wrapBufferArray(final ByteBuffer[] bufferArray) throws 
SSLException
+    private int wrapBufferArray(Collection<QpidByteBuffer> bufferArray) throws 
SSLException
     {
         int byteBuffersWritten = 0;
         int remaining = 0;
@@ -171,8 +171,8 @@ public class NonBlockingConnectionTLSDel
         {
             if(_sslEngine.getHandshakeStatus() != 
SSLEngineResult.HandshakeStatus.NEED_UNWRAP)
             {
-                final ByteBuffer netBuffer = 
ByteBuffer.allocateDirect(_sslEngine.getSession().getPacketBufferSize());
-                _status = _sslEngine.wrap(bufferArray, netBuffer);
+                final QpidByteBuffer netBuffer = 
QpidByteBuffer.allocateDirect(_sslEngine.getSession().getPacketBufferSize());
+                _status = QpidByteBuffer.encryptSSL(_sslEngine,bufferArray, 
netBuffer);
                 runSSLEngineTasks(_status);
 
                 netBuffer.flip();
@@ -181,12 +181,14 @@ public class NonBlockingConnectionTLSDel
                 {
                     _encryptedOutput.add(netBuffer);
                 }
-                for (ByteBuffer buf : bufferArray)
+                Iterator<QpidByteBuffer> iter = bufferArray.iterator();
+                while (iter.hasNext())
                 {
+                    QpidByteBuffer buf = iter.next();
                     if (buf.remaining() == 0)
                     {
                         byteBuffersWritten++;
-                        _parent.writeBufferProcessed();
+                        iter.remove();
                     }
                     else
                     {
@@ -284,9 +286,4 @@ public class NonBlockingConnectionTLSDel
         return _netInputBuffer;
     }
 
-    @Override
-    public void setNetInputBuffer(final QpidByteBuffer netInputBuffer)
-    {
-        _netInputBuffer = netInputBuffer;
-    }
 }

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java?rev=1694835&r1=1694834&r2=1694835&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java
 Sat Aug  8 18:26:14 2015
@@ -26,6 +26,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.security.Principal;
 import java.security.cert.Certificate;
+import java.util.Collection;
 
 public class NonBlockingConnectionUndecidedDelegate implements 
NonBlockingConnectionDelegate
 {
@@ -73,7 +74,7 @@ public class NonBlockingConnectionUndeci
     }
 
     @Override
-    public boolean doWrite(ByteBuffer[] bufferArray) throws IOException
+    public boolean doWrite(Collection<QpidByteBuffer> buffers) throws 
IOException
     {
         return true;
     }
@@ -128,9 +129,4 @@ public class NonBlockingConnectionUndeci
         return _netInputBuffer;
     }
 
-    @Override
-    public void setNetInputBuffer(final QpidByteBuffer netInputBuffer)
-    {
-        _netInputBuffer = netInputBuffer;
-    }
 }

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java?rev=1694835&r1=1694834&r2=1694835&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
 Sat Aug  8 18:26:14 2015
@@ -143,7 +143,7 @@ public class ProtocolEngine_1_0_0Test ex
             @Override
             public Object answer(final InvocationOnMock invocation) throws 
Throwable
             {
-                
_sentBuffers.add(byteBufferCaptor.getValue().getNativeBuffer());
+                _sentBuffers.add(byteBufferCaptor.getValue().asByteBuffer());
                 return null;
             }
         }).when(sender).send(byteBufferCaptor.capture());
@@ -249,7 +249,8 @@ public class ProtocolEngine_1_0_0Test ex
 
         createEngine(useSASL, Transport.TCP);
 
-        
_protocolEngine_1_0_0.received(QpidByteBuffer.wrap(ProtocolEngineCreator_1_0_0_SASL.getInstance().getHeaderIdentifier()));
+        
_protocolEngine_1_0_0.received(QpidByteBuffer.wrap(ProtocolEngineCreator_1_0_0_SASL.getInstance()
+                                                                   
.getHeaderIdentifier()));
 
         SaslInit init = new SaslInit();
         init.setMechanism(Symbol.valueOf("ANONYMOUS"));
@@ -260,7 +261,8 @@ public class ProtocolEngine_1_0_0Test ex
         buf.flip();
         _protocolEngine_1_0_0.received(buf);
 
-        
_protocolEngine_1_0_0.received(QpidByteBuffer.wrap(ProtocolEngineCreator_1_0_0.getInstance().getHeaderIdentifier()));
+        
_protocolEngine_1_0_0.received(QpidByteBuffer.wrap(ProtocolEngineCreator_1_0_0.getInstance()
+                                                                   
.getHeaderIdentifier()));
 
         Open open = new Open();
         _frameWriter.setValue(AMQFrame.createAMQFrame((short)0,open));

Modified: 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?rev=1694835&r1=1694834&r2=1694835&view=diff
==============================================================================
--- 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
 (original)
+++ 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
 Sat Aug  8 18:26:14 2015
@@ -78,7 +78,7 @@ public abstract class AbstractJMSMessage
                     _logger.debug("Non-fragmented message body (bodySize=" + 
contentHeader.getBodySize() + ")");
                 }
 
-                data = ((ContentBody) 
bodies.get(0)).getPayload().getNativeBuffer().duplicate();
+                data = ((ContentBody) 
bodies.get(0)).getPayload().asByteBuffer().duplicate();
             }
             else if (bodies != null)
             {
@@ -93,7 +93,7 @@ public abstract class AbstractJMSMessage
                 while (it.hasNext())
                 {
                     ContentBody cb = (ContentBody) it.next();
-                    final ByteBuffer payload = 
cb.getPayload().getNativeBuffer().duplicate();
+                    final ByteBuffer payload = 
cb.getPayload().asByteBuffer().duplicate();
                     if (payload.isDirect() || payload.isReadOnly())
                     {
                         data.put(payload);

Modified: 
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/ByteBufferRef.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/ByteBufferRef.java?rev=1694835&r1=1694834&r2=1694835&view=diff
==============================================================================
--- 
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/ByteBufferRef.java
 (original)
+++ 
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/ByteBufferRef.java
 Sat Aug  8 18:26:14 2015
@@ -29,4 +29,6 @@ public interface ByteBufferRef
     void decrementRef();
 
     ByteBuffer getBuffer();
+
+    void removeFromPool();
 }

Modified: 
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/NonPooledByteBufferRef.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/NonPooledByteBufferRef.java?rev=1694835&r1=1694834&r2=1694835&view=diff
==============================================================================
--- 
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/NonPooledByteBufferRef.java
 (original)
+++ 
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/NonPooledByteBufferRef.java
 Sat Aug  8 18:26:14 2015
@@ -48,4 +48,10 @@ class NonPooledByteBufferRef implements
     {
         return _buffer;
     }
+
+    @Override
+    public void removeFromPool()
+    {
+
+    }
 }

Modified: 
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java?rev=1694835&r1=1694834&r2=1694835&view=diff
==============================================================================
--- 
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java
 (original)
+++ 
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java
 Sat Aug  8 18:26:14 2015
@@ -25,7 +25,7 @@ import java.util.concurrent.atomic.Atomi
 
 class PooledByteBufferRef implements ByteBufferRef
 {
-    private static final AtomicIntegerFieldUpdater<PooledByteBufferRef> 
UPDATER = AtomicIntegerFieldUpdater.newUpdater(PooledByteBufferRef.class, 
"_refCount");
+    private static final AtomicIntegerFieldUpdater<PooledByteBufferRef> 
REF_COUNT = AtomicIntegerFieldUpdater.newUpdater(PooledByteBufferRef.class, 
"_refCount");
 
     private final ByteBuffer _buffer;
     private volatile int _refCount;
@@ -38,13 +38,17 @@ class PooledByteBufferRef implements Byt
     @Override
     public void incrementRef()
     {
-        UPDATER.incrementAndGet(this);
+
+        if(REF_COUNT.get(this) >= 0)
+        {
+            REF_COUNT.incrementAndGet(this);
+        }
     }
 
     @Override
     public void decrementRef()
     {
-        if(UPDATER.decrementAndGet(this) == 0)
+        if(REF_COUNT.get(this) > 0 && REF_COUNT.decrementAndGet(this) == 0)
         {
             returnToPool(this);
         }
@@ -56,9 +60,14 @@ class PooledByteBufferRef implements Byt
         return _buffer.duplicate();
     }
 
-    private void returnToPool(final PooledByteBufferRef byteBufferRef)
+    @Override
+    public void removeFromPool()
     {
+        REF_COUNT.set(this, Integer.MIN_VALUE/2);
+    }
 
+    private void returnToPool(final PooledByteBufferRef byteBufferRef)
+    {
     }
 
 }

Modified: 
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java?rev=1694835&r1=1694834&r2=1694835&view=diff
==============================================================================
--- 
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
 (original)
+++ 
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
 Sat Aug  8 18:26:14 2015
@@ -25,9 +25,17 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.nio.CharBuffer;
+import java.nio.channels.GatheringByteChannel;
+import java.nio.channels.ReadableByteChannel;
 import java.nio.charset.Charset;
+import java.util.Collection;
+import java.util.Iterator;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLException;
+
 import org.apache.qpid.codec.MarkableDataInput;
 import org.apache.qpid.framing.AMQShortString;
 
@@ -417,8 +425,9 @@ public final class QpidByteBuffer
     }
 
 
-    public ByteBuffer getNativeBuffer()
+    public ByteBuffer asByteBuffer()
     {
+        _ref.removeFromPool();
         return _buffer;
     }
 
@@ -427,6 +436,43 @@ public final class QpidByteBuffer
         return charset.decode(_buffer);
     }
 
+    public int read(ReadableByteChannel channel) throws IOException
+    {
+        return channel.read(_buffer);
+    }
+
+
+    public SSLEngineResult decryptSSL(SSLEngine engine, QpidByteBuffer dest) 
throws SSLException
+    {
+        return engine.unwrap(_buffer, dest._buffer);
+    }
+
+
+    public static SSLEngineResult encryptSSL(SSLEngine engine,
+                                             final Collection<QpidByteBuffer> 
buffers,
+                                             QpidByteBuffer dest) throws 
SSLException
+    {
+        final ByteBuffer[] src = new ByteBuffer[buffers.size()];
+        Iterator<QpidByteBuffer> iter = buffers.iterator();
+        for(int i = 0; i<src.length; i++)
+        {
+            src[i] = iter.next()._buffer;
+        }
+        return engine.wrap(src, dest._buffer);
+    }
+
+
+    public static long write(GatheringByteChannel channel, 
Collection<QpidByteBuffer> buffers) throws IOException
+    {
+        ByteBuffer[] bufs = new ByteBuffer[buffers.size()];
+        Iterator<QpidByteBuffer> bufIter = buffers.iterator();
+        for(int i = 0; i < bufs.length; i++)
+        {
+            bufs[i] = bufIter.next()._buffer;
+        }
+        return channel.write(bufs);
+    }
+
     public static QpidByteBuffer wrap(final ByteBuffer wrap)
     {
         return new QpidByteBuffer(new NonPooledByteBufferRef(wrap));

Modified: 
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java?rev=1694835&r1=1694834&r2=1694835&view=diff
==============================================================================
--- 
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
 (original)
+++ 
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
 Sat Aug  8 18:26:14 2015
@@ -156,7 +156,7 @@ public class SSLSender implements ByteBu
             int read = 0;
             try
             {
-                SSLEngineResult result = 
engine.wrap(appData.getNativeBuffer(), netData);
+                SSLEngineResult result = engine.wrap(appData.asByteBuffer(), 
netData);
                 read   = result.bytesProduced();
                 status = result.getStatus();
                 handshakeStatus = result.getHandshakeStatus();

Modified: 
qpid/java/trunk/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java?rev=1694835&r1=1694834&r2=1694835&view=diff
==============================================================================
--- 
qpid/java/trunk/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java 
(original)
+++ 
qpid/java/trunk/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java 
Sat Aug  8 18:26:14 2015
@@ -97,7 +97,7 @@ public class AMQDecoderTest extends Qpid
         {
             assertEquals(ContentBody.TYPE, ((AMQFrame) 
frames.get(0)).getBodyFrame().getFrameType());
             ContentBody decodedBody = (ContentBody) ((AMQFrame) 
frames.get(0)).getBodyFrame();
-            final ByteBuffer byteBuffer = 
decodedBody.getPayload().getNativeBuffer().duplicate();
+            final ByteBuffer byteBuffer = 
decodedBody.getPayload().asByteBuffer().duplicate();
             byte[] bodyBytes = new byte[byteBuffer.remaining()];
             byteBuffer.get(bodyBytes);
             assertTrue("Body was corrupted", Arrays.equals(payload, 
bodyBytes));



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to