Author: rgodfrey Date: Tue Dec 9 10:00:24 2014 New Revision: 1644014 URL: http://svn.apache.org/r1644014 Log: Ensure selector is closed, continue to use same backing buffer on read until it is full
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java?rev=1644014&r1=1644013&r2=1644014&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java Tue Dec 9 10:00:24 2014 @@ -20,8 +20,6 @@ package org.apache.qpid.transport.network.io; import java.io.IOException; -import java.net.SocketException; -import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; @@ -56,6 +54,7 @@ public class NonBlockingSenderReceiver private final int _receiveBufSize; private final Ticker _ticker; + private ByteBuffer _currentBuffer; public NonBlockingSenderReceiver(final SocketChannel socketChannel, Receiver<ByteBuffer> receiver, int receiveBufSize, Ticker ticker) @@ -151,20 +150,13 @@ public class NonBlockingSenderReceiver } } - try + try(Selector selector = _selector; SocketChannel channel = _socketChannel) { while(!doWrite()) { } - try - { - _receiver.closed(); - } - finally - { - _socketChannel.close(); - } + _receiver.closed(); } catch (IOException e) { @@ -224,19 +216,23 @@ public class NonBlockingSenderReceiver private void doRead() throws IOException { - ByteBuffer buffer; int remaining; do { - buffer = ByteBuffer.allocate(_receiveBufSize); - _socketChannel.read(buffer); - remaining = buffer.remaining(); + if(_currentBuffer == null || _currentBuffer.remaining() == 0) + { + _currentBuffer = ByteBuffer.allocate(_receiveBufSize); + } + _socketChannel.read(_currentBuffer); + remaining = _currentBuffer.remaining(); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Read " + buffer.position() + " byte(s)"); + LOGGER.debug("Read " + _currentBuffer.position() + " byte(s)"); } - buffer.flip(); - _receiver.received(buffer); + ByteBuffer dup = _currentBuffer.duplicate(); + dup.flip(); + _currentBuffer = _currentBuffer.slice(); + _receiver.received(dup); } while (remaining == 0); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org