Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=825362&r1=825361&r2=825362&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Thu Oct 15 01:06:23 2009 @@ -20,11 +20,6 @@ */ package org.apache.qpid.client.protocol; -import org.apache.commons.lang.StringUtils; -import org.apache.mina.common.CloseFuture; -import org.apache.mina.common.IdleStatus; -import org.apache.mina.common.IoSession; -import org.apache.mina.common.WriteFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import org.apache.commons.lang.StringUtils; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; @@ -65,10 +61,6 @@ protected static final String SASL_CLIENT = "SASLClient"; - protected final IoSession _minaProtocolSession; - - protected WriteFuture _lastWriteFuture; - /** * The handler from which this session was created and which is used to handle protocol events. We send failover * events to the handler. @@ -102,28 +94,15 @@ protected final AMQConnection _connection; - private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0; + private ConnectionTuneParameters _connectionTuneParameters; - public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection) - { - _protocolHandler = protocolHandler; - _minaProtocolSession = protocolSession; - _minaProtocolSession.setAttachment(this); - // properties of the connection are made available to the event handlers - _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection); - // fixme - real value needed - _minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT); - _protocolVersion = connection.getProtocolVersion(); - _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(), - this); - _connection = connection; + private SaslClient _saslClient; - } + private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0; public AMQProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection) { - _protocolHandler = protocolHandler; - _minaProtocolSession = null; + _protocolHandler = protocolHandler; _protocolVersion = connection.getProtocolVersion(); _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(), this); @@ -134,7 +113,7 @@ { // start the process of setting up the connection. This is the first place that // data is written to the server. - _minaProtocolSession.write(new ProtocolInitiation(_connection.getProtocolVersion())); + _protocolHandler.writeFrame(new ProtocolInitiation(_connection.getProtocolVersion())); } public String getClientID() @@ -175,14 +154,9 @@ return getAMQConnection().getPassword(); } - public IoSession getIoSession() - { - return _minaProtocolSession; - } - public SaslClient getSaslClient() { - return (SaslClient) _minaProtocolSession.getAttribute(SASL_CLIENT); + return _saslClient; } /** @@ -192,28 +166,21 @@ */ public void setSaslClient(SaslClient client) { - if (client == null) - { - _minaProtocolSession.removeAttribute(SASL_CLIENT); - } - else - { - _minaProtocolSession.setAttribute(SASL_CLIENT, client); - } + _saslClient = client; } public ConnectionTuneParameters getConnectionTuneParameters() { - return (ConnectionTuneParameters) _minaProtocolSession.getAttribute(CONNECTION_TUNE_PARAMETERS); + return _connectionTuneParameters; } public void setConnectionTuneParameters(ConnectionTuneParameters params) { - _minaProtocolSession.setAttribute(CONNECTION_TUNE_PARAMETERS, params); + _connectionTuneParameters = params; AMQConnection con = getAMQConnection(); con.setMaximumChannelCount(params.getChannelMax()); con.setMaximumFrameSize(params.getFrameMax()); - initHeartbeats((int) params.getHeartbeat()); + _protocolHandler.initHeartbeats((int) params.getHeartbeat()); } /** @@ -335,21 +302,12 @@ */ public void writeFrame(AMQDataBlock frame) { - writeFrame(frame, false); + _protocolHandler.writeFrame(frame); } public void writeFrame(AMQDataBlock frame, boolean wait) { - WriteFuture f = _minaProtocolSession.write(frame); - if (wait) - { - // fixme -- time out? - f.join(); - } - else - { - _lastWriteFuture = f; - } + _protocolHandler.writeFrame(frame, wait); } /** @@ -407,33 +365,12 @@ public AMQConnection getAMQConnection() { - return (AMQConnection) _minaProtocolSession.getAttribute(AMQ_CONNECTION); + return _connection; } - public void closeProtocolSession() + public void closeProtocolSession() throws AMQException { - closeProtocolSession(true); - } - - public void closeProtocolSession(boolean waitLast) - { - _logger.debug("Waiting for last write to join."); - if (waitLast && (_lastWriteFuture != null)) - { - _lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT); - } - - _logger.debug("Closing protocol session"); - - final CloseFuture future = _minaProtocolSession.close(); - - // There is no recovery we can do if the join on the close failes so simply mark the connection CLOSED - // then wait for the connection to close. - // ritchiem: Could this release BlockingWaiters to early? The close has been done as much as possible so any - // error now shouldn't matter. - - _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_CLOSED); - future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT); + _protocolHandler.closeConnection(0); } public void failover(String host, int port) @@ -449,22 +386,11 @@ id = _queueId++; } // get rid of / and : and ; from address for spec conformance - String localAddress = StringUtils.replaceChars(_minaProtocolSession.getLocalAddress().toString(), "/;:", ""); + String localAddress = StringUtils.replaceChars(_protocolHandler.getLocalAddress().toString(), "/;:", ""); return new AMQShortString("tmp_" + localAddress + "_" + id); } - /** @param delay delay in seconds (not ms) */ - void initHeartbeats(int delay) - { - if (delay > 0) - { - _minaProtocolSession.setIdleTime(IdleStatus.WRITER_IDLE, delay); - _minaProtocolSession.setIdleTime(IdleStatus.READER_IDLE, HeartbeatConfig.CONFIG.getTimeout(delay)); - HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay)); - } - } - public void confirmConsumerCancelled(int channelId, AMQShortString consumerTag) { final AMQSession session = getSession(channelId); @@ -530,7 +456,7 @@ public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBody) throws AMQException { - _protocolHandler.methodBodyReceived(channel, amqMethodBody, _minaProtocolSession); + _protocolHandler.methodBodyReceived(channel, amqMethodBody); } public void notifyError(Exception error)
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java?rev=825362&r1=825361&r2=825362&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java Thu Oct 15 01:06:23 2009 @@ -20,27 +20,20 @@ */ package org.apache.qpid.client.transport; +import java.io.IOException; +import java.net.InetSocketAddress; + import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.ConnectFuture; import org.apache.mina.common.IoConnector; import org.apache.mina.common.SimpleByteBufferAllocator; -import org.apache.mina.transport.socket.nio.ExistingSocketConnector; -import org.apache.mina.transport.socket.nio.SocketConnectorConfig; -import org.apache.mina.transport.socket.nio.SocketSessionConfig; - +import org.apache.qpid.client.SSLConfiguration; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.pool.ReadWriteThreadModel; - +import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.network.mina.MINANetworkDriver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - public class SocketTransportConnection implements ITransportConnection { private static final Logger _logger = LoggerFactory.getLogger(SocketTransportConnection.class); @@ -71,61 +64,27 @@ } final IoConnector ioConnector = _socketConnectorFactory.newSocketConnector(); - SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig(); - - // if we do not use our own thread model we get the MINA default which is to use - // its own leader-follower model - boolean readWriteThreading = Boolean.getBoolean("amqj.shared_read_write_pool"); - if (readWriteThreading) - { - cfg.setThreadModel(ReadWriteThreadModel.getInstance()); - } - - SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig(); - scfg.setTcpNoDelay("true".equalsIgnoreCase(System.getProperty("amqj.tcpNoDelay", "true"))); - scfg.setSendBufferSize(Integer.getInteger("amqj.sendBufferSize", DEFAULT_BUFFER_SIZE)); - _logger.info("send-buffer-size = " + scfg.getSendBufferSize()); - scfg.setReceiveBufferSize(Integer.getInteger("amqj.receiveBufferSize", DEFAULT_BUFFER_SIZE)); - _logger.info("recv-buffer-size = " + scfg.getReceiveBufferSize()); - final InetSocketAddress address; if (brokerDetail.getTransport().equals(BrokerDetails.SOCKET)) { address = null; - - Socket socket = TransportConnection.removeOpenSocket(brokerDetail.getHost()); - - if (socket != null) - { - _logger.info("Using existing Socket:" + socket); - - ((ExistingSocketConnector) ioConnector).setOpenSocket(socket); - } - else - { - throw new IllegalArgumentException("Active Socket must be provided for broker " + - "with 'socket://<SocketID>' transport:" + brokerDetail); - } } else { address = new InetSocketAddress(brokerDetail.getHost(), brokerDetail.getPort()); _logger.info("Attempting connection to " + address); } - - - ConnectFuture future = ioConnector.connect(address, protocolHandler); - - // wait for connection to complete - if (future.join(brokerDetail.getTimeout())) - { - // we call getSession which throws an IOException if there has been an error connecting - future.getSession(); - } - else - { - throw new IOException("Timeout waiting for connection."); - } + + SSLConfiguration sslConfig = protocolHandler.getConnection().getSSLConfiguration(); + SSLContextFactory sslFactory = null; + if (sslConfig != null) + { + sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType()); + } + + MINANetworkDriver driver = new MINANetworkDriver(ioConnector); + driver.open(brokerDetail.getPort(), address.getAddress(), protocolHandler, null, sslFactory); + protocolHandler.setNetworkDriver(driver); } } Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java?rev=825362&r1=825361&r2=825362&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java Thu Oct 15 01:06:23 2009 @@ -20,6 +20,12 @@ */ package org.apache.qpid.client.transport; +import java.io.IOException; +import java.net.Socket; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + import org.apache.mina.common.IoConnector; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoServiceConfig; @@ -30,16 +36,12 @@ import org.apache.mina.transport.vmpipe.VmPipeAddress; import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.pool.ReadWriteThreadModel; +import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.thread.QpidThreadExecutor; +import org.apache.qpid.transport.network.mina.MINANetworkDriver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.net.Socket; - /** * The TransportConnection is a helper class responsible for connecting to an AMQ server. It sets up the underlying * connector, which currently always uses TCP/IP sockets. It creates the "protocol handler" which deals with MINA @@ -61,7 +63,7 @@ private static Logger _logger = LoggerFactory.getLogger(TransportConnection.class); - private static final String DEFAULT_QPID_SERVER = "org.apache.qpid.server.protocol.AMQPFastProtocolHandler"; + private static final String DEFAULT_QPID_SERVER = "org.apache.qpid.server.protocol.AMQProtocolEngineFactory"; private static Map<String, Socket> _openSocketRegister = new ConcurrentHashMap<String, Socket>(); @@ -75,7 +77,7 @@ return _openSocketRegister.remove(socketID); } - public static synchronized ITransportConnection getInstance(BrokerDetails details) throws AMQTransportConnectionException + public static synchronized ITransportConnection getInstance(final BrokerDetails details) throws AMQTransportConnectionException { int transport = getTransport(details.getTransport()); @@ -91,7 +93,22 @@ { public IoConnector newSocketConnector() { - return new ExistingSocketConnector(1,new QpidThreadExecutor()); + ExistingSocketConnector connector = new ExistingSocketConnector(1,new QpidThreadExecutor()); + + Socket socket = TransportConnection.removeOpenSocket(details.getHost()); + + if (socket != null) + { + _logger.info("Using existing Socket:" + socket); + + ((ExistingSocketConnector) connector).setOpenSocket(socket); + } + else + { + throw new IllegalArgumentException("Active Socket must be provided for broker " + + "with 'socket://<SocketID>' transport:" + details); + } + return connector; } }); case TCP: @@ -189,8 +206,6 @@ _acceptor = new VmPipeAcceptor(); IoServiceConfig config = _acceptor.getDefaultConfig(); - - config.setThreadModel(ReadWriteThreadModel.getInstance()); } synchronized (_inVmPipeAddress) { @@ -275,7 +290,10 @@ { Class[] cnstr = {Integer.class}; Object[] params = {port}; - provider = (IoHandlerAdapter) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params); + + provider = new MINANetworkDriver(); + ProtocolEngineFactory engineFactory = (ProtocolEngineFactory) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params); + ((MINANetworkDriver) provider).setProtocolEngineFactory(engineFactory, true); // Give the broker a second to create _logger.info("Created VMBroker Instance:" + port); } Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java?rev=825362&r1=825361&r2=825362&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java Thu Oct 15 01:06:23 2009 @@ -20,25 +20,26 @@ */ package org.apache.qpid.client.transport; +import java.io.IOException; + import org.apache.mina.common.ConnectFuture; -import org.apache.mina.common.IoServiceConfig; import org.apache.mina.transport.vmpipe.QpidVmPipeConnector; import org.apache.mina.transport.vmpipe.VmPipeAddress; import org.apache.mina.transport.vmpipe.VmPipeConnector; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.pool.ReadWriteThreadModel; +import org.apache.qpid.transport.network.mina.MINANetworkDriver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; - public class VmPipeTransportConnection implements ITransportConnection { private static final Logger _logger = LoggerFactory.getLogger(VmPipeTransportConnection.class); private static int _port; + private MINANetworkDriver _networkDriver; + public VmPipeTransportConnection(int port) { _port = port; @@ -47,16 +48,16 @@ public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException { final VmPipeConnector ioConnector = new QpidVmPipeConnector(); - final IoServiceConfig cfg = ioConnector.getDefaultConfig(); - - cfg.setThreadModel(ReadWriteThreadModel.getInstance()); final VmPipeAddress address = new VmPipeAddress(_port); _logger.info("Attempting connection to " + address); - ConnectFuture future = ioConnector.connect(address, protocolHandler); + _networkDriver = new MINANetworkDriver(ioConnector, protocolHandler); + protocolHandler.setNetworkDriver(_networkDriver); + ConnectFuture future = ioConnector.connect(address, _networkDriver); // wait for connection to complete future.join(); // we call getSession which throws an IOException if there has been an error connecting future.getSession(); + _networkDriver.setProtocolEngine(protocolHandler); } } Modified: qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java?rev=825362&r1=825361&r2=825362&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java (original) +++ qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java Thu Oct 15 01:06:23 2009 @@ -27,6 +27,7 @@ import org.apache.qpid.framing.amqp_8_0.BasicRecoverOkBodyImpl; import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.transport.TestNetworkDriver; import org.apache.qpid.client.MockAMQConnection; import org.apache.qpid.client.AMQAuthenticationException; import org.apache.qpid.client.state.AMQState; @@ -72,9 +73,7 @@ { //Create a new ProtocolHandler with a fake connection. _handler = new AMQProtocolHandler(new MockAMQConnection("amqp://guest:gu...@client/test?brokerlist='vm://:1'")); - - _handler.sessionCreated(new MockIoSession()); - + _handler.setNetworkDriver(new TestNetworkDriver()); AMQBody body = BasicRecoverOkBodyImpl.getFactory().newInstance(null, 1); _blockFrame = new AMQFrame(0, body); Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java?rev=825362&r1=825361&r2=825362&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java (original) +++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java Thu Oct 15 01:06:23 2009 @@ -23,6 +23,7 @@ import org.apache.mina.filter.codec.ProtocolCodecFactory; import org.apache.mina.filter.codec.ProtocolDecoder; import org.apache.mina.filter.codec.ProtocolEncoder; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; /** * AMQCodecFactory is a Mina codec factory. It supplies the encoders and decoders need to read and write the bytes to @@ -50,9 +51,9 @@ * @param expectProtocolInitiation <tt>true</tt> if the first frame received is going to be a protocol initiation * frame, <tt>false</tt> if it is going to be a standard AMQ data block. */ - public AMQCodecFactory(boolean expectProtocolInitiation) + public AMQCodecFactory(boolean expectProtocolInitiation, AMQVersionAwareProtocolSession session) { - _frameDecoder = new AMQDecoder(expectProtocolInitiation); + _frameDecoder = new AMQDecoder(expectProtocolInitiation, session); } /** @@ -70,7 +71,7 @@ * * @return The AMQP decoder. */ - public ProtocolDecoder getDecoder() + public AMQDecoder getDecoder() { return _frameDecoder; } Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java?rev=825362&r1=825361&r2=825362&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java (original) +++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java Thu Oct 15 01:06:23 2009 @@ -20,14 +20,21 @@ */ package org.apache.qpid.codec; +import java.util.ArrayList; + import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.IoSession; import org.apache.mina.common.SimpleByteBufferAllocator; import org.apache.mina.filter.codec.CumulativeProtocolDecoder; import org.apache.mina.filter.codec.ProtocolDecoderOutput; +import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQDataBlockDecoder; +import org.apache.qpid.framing.AMQFrameDecodingException; +import org.apache.qpid.framing.AMQMethodBodyFactory; +import org.apache.qpid.framing.AMQProtocolVersionException; import org.apache.qpid.framing.ProtocolInitiation; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; /** * AMQDecoder delegates the decoding of AMQP either to a data block decoder, or in the case of new connections, to a @@ -62,14 +69,19 @@ private boolean _expectProtocolInitiation; private boolean firstDecode = true; + private AMQMethodBodyFactory _bodyFactory; + + private ByteBuffer _remainingBuf; + /** * Creates a new AMQP decoder. * * @param expectProtocolInitiation <tt>true</tt> if this decoder needs to handle protocol initiation. */ - public AMQDecoder(boolean expectProtocolInitiation) + public AMQDecoder(boolean expectProtocolInitiation, AMQVersionAwareProtocolSession session) { _expectProtocolInitiation = expectProtocolInitiation; + _bodyFactory = new AMQMethodBodyFactory(session); } /** @@ -120,7 +132,7 @@ protected boolean doDecodeDataBlock(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception { int pos = in.position(); - boolean enoughData = _dataBlockDecoder.decodable(session, in); + boolean enoughData = _dataBlockDecoder.decodable(in.buf()); in.position(pos); if (!enoughData) { @@ -149,7 +161,7 @@ */ private boolean doDecodePI(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception { - boolean enoughData = _piDecoder.decodable(session, in); + boolean enoughData = _piDecoder.decodable(in.buf()); if (!enoughData) { // returning false means it will leave the contents in the buffer and @@ -158,7 +170,8 @@ } else { - _piDecoder.decode(session, in, out); + ProtocolInitiation pi = new ProtocolInitiation(in.buf()); + out.write(pi); return true; } @@ -177,7 +190,7 @@ } - /** + /** * Cumulates content of <tt>in</tt> into internal buffer and forwards * decoding request to {...@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}. * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt> @@ -268,4 +281,60 @@ session.setAttribute( BUFFER, remainingBuf ); } + public ArrayList<AMQDataBlock> decodeBuffer(java.nio.ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException + { + + // get prior remaining data from accumulator + ArrayList<AMQDataBlock> dataBlocks = new ArrayList<AMQDataBlock>(); + ByteBuffer msg; + // if we have a session buffer, append data to that otherwise + // use the buffer read from the network directly + if( _remainingBuf != null ) + { + _remainingBuf.put(buf); + _remainingBuf.flip(); + msg = _remainingBuf; + } + else + { + msg = ByteBuffer.wrap(buf); + } + + if (_expectProtocolInitiation + || (firstDecode + && (msg.remaining() > 0) + && (msg.get(msg.position()) == (byte)'A'))) + { + if (_piDecoder.decodable(msg.buf())) + { + dataBlocks.add(new ProtocolInitiation(msg.buf())); + } + } + else + { + boolean enoughData = true; + while (enoughData) + { + int pos = msg.position(); + + enoughData = _dataBlockDecoder.decodable(msg); + msg.position(pos); + if (enoughData) + { + dataBlocks.add(_dataBlockDecoder.createAndPopulateFrame(_bodyFactory, msg)); + } + else + { + _remainingBuf = SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate(msg.remaining(), false); + _remainingBuf.setAutoExpand(true); + _remainingBuf.put(msg); + } + } + } + if(firstDecode && dataBlocks.size() > 0) + { + firstDecode = false; + } + return dataBlocks; + } } Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java?rev=825362&r1=825361&r2=825362&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java (original) +++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java Thu Oct 15 01:06:23 2009 @@ -47,7 +47,7 @@ public AMQDataBlockDecoder() { } - public boolean decodable(IoSession session, ByteBuffer in) throws AMQFrameDecodingException + public boolean decodable(java.nio.ByteBuffer in) throws AMQFrameDecodingException { final int remainingAfterAttributes = in.remaining() - (1 + 2 + 4 + 1); // type, channel, body length and end byte @@ -56,14 +56,15 @@ return false; } - in.skip(1 + 2); - final long bodySize = in.getUnsignedInt(); + in.position(in.position() + 1 + 2); + // Get an unsigned int, lifted from MINA ByteBuffer getUnsignedInt() + final long bodySize = in.getInt() & 0xffffffffL; return (remainingAfterAttributes >= bodySize); } - protected Object createAndPopulateFrame(IoSession session, ByteBuffer in) + public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, ByteBuffer in) throws AMQFrameDecodingException, AMQProtocolVersionException { final byte type = in.get(); @@ -71,15 +72,7 @@ BodyFactory bodyFactory; if (type == AMQMethodBody.TYPE) { - bodyFactory = (BodyFactory) session.getAttribute(SESSION_METHOD_BODY_FACTORY); - if (bodyFactory == null) - { - AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment(); - bodyFactory = new AMQMethodBodyFactory(protocolSession); - session.setAttribute(SESSION_METHOD_BODY_FACTORY, bodyFactory); - - } - + bodyFactory = methodBodyFactory; } else { @@ -115,6 +108,24 @@ public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception { - out.write(createAndPopulateFrame(session, in)); + AMQMethodBodyFactory bodyFactory = (AMQMethodBodyFactory) session.getAttribute(SESSION_METHOD_BODY_FACTORY); + if (bodyFactory == null) + { + AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment(); + bodyFactory = new AMQMethodBodyFactory(protocolSession); + session.setAttribute(SESSION_METHOD_BODY_FACTORY, bodyFactory); + } + + out.write(createAndPopulateFrame(bodyFactory, in)); + } + + public boolean decodable(ByteBuffer msg) throws AMQFrameDecodingException + { + return decodable(msg.buf()); + } + + public AMQDataBlock createAndPopulateFrame(AMQMethodBodyFactory factory, java.nio.ByteBuffer msg) throws AMQProtocolVersionException, AMQFrameDecodingException + { + return createAndPopulateFrame(factory, ByteBuffer.wrap(msg)); } } Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java?rev=825362&r1=825361&r2=825362&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java (original) +++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java Thu Oct 15 01:06:23 2009 @@ -20,12 +20,10 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.IoSession; -import org.apache.mina.filter.codec.ProtocolDecoderOutput; import org.apache.qpid.AMQException; import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQDataBlock { @@ -53,13 +51,12 @@ _protocolMajor = protocolMajor; _protocolMinor = protocolMinor; } - + public ProtocolInitiation(ProtocolVersion pv) { this(AMQP_HEADER, CURRENT_PROTOCOL_CLASS, TCP_PROTOCOL_INSTANCE, pv.getMajorVersion(), pv.getMinorVersion()); } - public ProtocolInitiation(ByteBuffer in) { _protocolHeader = new byte[4]; @@ -71,6 +68,11 @@ _protocolMinor = in.get(); } + public void writePayload(org.apache.mina.common.ByteBuffer buffer) + { + writePayload(buffer.buf()); + } + public long getSize() { return 4 + 1 + 1 + 1 + 1; @@ -127,16 +129,11 @@ * @return true if we have enough data to decode the PI frame fully, false if more * data is required */ - public boolean decodable(IoSession session, ByteBuffer in) + public boolean decodable(ByteBuffer in) { return (in.remaining() >= 8); } - public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) - { - ProtocolInitiation pi = new ProtocolInitiation(in); - out.write(pi); - } } public ProtocolVersion checkVersion() throws AMQException @@ -192,4 +189,5 @@ buffer.append(Integer.toHexString(_protocolMinor)); return buffer.toString(); } + } Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java?rev=825362&r1=825361&r2=825362&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java (original) +++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java Thu Oct 15 01:06:23 2009 @@ -21,9 +21,12 @@ package org.apache.qpid.pool; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.mina.common.IoSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A Job is a continuation that batches together other continuations, specifically {...@link Event}s, into one continuation. @@ -52,35 +55,28 @@ */ public class Job implements ReadWriteRunnable { + + /** Defines the maximum number of events that will be batched into a single job. */ + public static final int MAX_JOB_EVENTS = Integer.getInteger("amqj.server.read_write_pool.max_events", 10); + /** The maximum number of events to process per run of the job. More events than this may be queued in the job. */ private final int _maxEvents; - /** The Mina session. */ - private final IoSession _session; - /** Holds the queue of events that make up the job. */ - private final java.util.Queue<Event> _eventQueue = new ConcurrentLinkedQueue<Event>(); + private final java.util.Queue<Runnable> _eventQueue = new ConcurrentLinkedQueue<Runnable>(); /** Holds a status flag, that indicates when the job is actively running. */ private final AtomicBoolean _active = new AtomicBoolean(); - /** Holds the completion continuation, called upon completion of a run of the job. */ - private final JobCompletionHandler _completionHandler; - private final boolean _readJob; - /** - * Creates a new job that aggregates many continuations together. - * - * @param session The Mina session. - * @param completionHandler The per job run, terminal continuation. - * @param maxEvents The maximum number of aggregated continuations to process per run of the job. - * @param readJob - */ - Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents, final boolean readJob) + private ReferenceCountingExecutorService _poolReference; + + private final static Logger _logger = LoggerFactory.getLogger(Job.class); + + public Job(ReferenceCountingExecutorService poolReference, int maxEvents, boolean readJob) { - _session = session; - _completionHandler = completionHandler; + _poolReference = poolReference; _maxEvents = maxEvents; _readJob = readJob; } @@ -90,7 +86,7 @@ * * @param evt The continuation to enqueue. */ - void add(Event evt) + public void add(Runnable evt) { _eventQueue.add(evt); } @@ -104,14 +100,14 @@ int i = _maxEvents; while( --i != 0 ) { - Event e = _eventQueue.poll(); + Runnable e = _eventQueue.poll(); if (e == null) { return true; } else { - e.process(_session); + e.run(); } } return false; @@ -153,40 +149,105 @@ if(processAll()) { deactivate(); - _completionHandler.completed(_session, this); + completed(); } else { - _completionHandler.notCompleted(_session, this); + notCompleted(); } } - public boolean isReadJob() - { - return _readJob; - } - public boolean isRead() { return _readJob; } - - public boolean isWrite() + + /** + * Adds an {...@link Event} to a {...@link Job}, triggering the execution of the job if it is not already running. + * + * @param job The job. + * @param event The event to hand off asynchronously. + */ + public static void fireAsynchEvent(ExecutorService pool, Job job, Runnable event) { - return !_readJob; - } + job.add(event); + + + if(pool == null) + { + return; + } + + // rather than perform additional checks on pool to check that it hasn't shutdown. + // catch the RejectedExecutionException that will result from executing on a shutdown pool + if (job.activate()) + { + try + { + pool.execute(job); + } + catch(RejectedExecutionException e) + { + _logger.warn("Thread pool shutdown while tasks still outstanding"); + } + } + } + /** - * Another interface for a continuation. + * Implements a terminal continuation for the {...@link Job} for this filter. Whenever the Job completes its processing + * of a batch of events this is called. This method simply re-activates the job, if it has more events to process. * - * @todo Get rid of this interface as there are other interfaces that could be used instead, such as FutureTask, - * Runnable or a custom Continuation interface. + * @param session The Mina session to work in. + * @param job The job that completed. */ - static interface JobCompletionHandler + public void completed() + { + if (!isComplete()) + { + final ExecutorService pool = _poolReference.getPool(); + + if(pool == null) + { + return; + } + + + // ritchiem : 2006-12-13 Do we need to perform the additional checks here? + // Can the pool be shutdown at this point? + if (activate()) + { + try + { + pool.execute(this); + } + catch(RejectedExecutionException e) + { + _logger.warn("Thread pool shutdown while tasks still outstanding"); + } + + } + } + } + + public void notCompleted() { - public void completed(IoSession session, Job job); + final ExecutorService pool = _poolReference.getPool(); + + if(pool == null) + { + return; + } - public void notCompleted(final IoSession session, final Job job); + try + { + pool.execute(this); + } + catch(RejectedExecutionException e) + { + _logger.warn("Thread pool shutdown while tasks still outstanding"); + } } + } Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java?rev=825362&r1=825361&r2=825362&view=diff ============================================================================== --- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java (original) +++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java Thu Oct 15 01:06:23 2009 @@ -23,5 +23,4 @@ public interface ReadWriteRunnable extends Runnable { boolean isRead(); - boolean isWrite(); } Modified: qpid/trunk/qpid/java/systests/build.xml URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/build.xml?rev=825362&r1=825361&r2=825362&view=diff ============================================================================== --- qpid/trunk/qpid/java/systests/build.xml (original) +++ qpid/trunk/qpid/java/systests/build.xml Thu Oct 15 01:06:23 2009 @@ -20,7 +20,7 @@ --> <project name="System Tests" default="build"> - <property name="module.depends" value="client management/tools/qpid-cli management/eclipse-plugin management/common broker broker/test common junit-toolkit"/> + <property name="module.depends" value="client management/tools/qpid-cli management/eclipse-plugin management/common broker broker/test common common/test nt junit-toolkit"/> <property name="module.test.src" location="src/main/java"/> <property name="module.test.excludes" value="**/TTLTest.java,**/DropInTest.java,**/TestClientControlledTest.java"/> Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java?rev=825362&r1=825361&r2=825362&view=diff ============================================================================== --- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java (original) +++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/failover/MessageDisappearWithIOExceptionTest.java Thu Oct 15 01:06:23 2009 @@ -195,18 +195,12 @@ // Send IO Exception - causing failover _connection.getProtocolHandler(). - exceptionCaught(_connection.getProtocolHandler().getProtocolSession().getIoSession(), - new WriteTimeoutException("WriteTimeoutException to cause failover.")); + exception(new WriteTimeoutException("WriteTimeoutException to cause failover.")); // Verify Failover occured through ConnectionListener assertTrue("Failover did not occur", _failoverOccured.await(4000, TimeUnit.MILLISECONDS)); - //Verify new protocolSession is not the same as the original - assertNotSame("Protocol Session has not changed", - protocolSession, - _connection.getProtocolHandler().getProtocolSession()); - /***********************************/ // This verifies that the bug has been resolved Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java?rev=825362&r1=825361&r2=825362&view=diff ============================================================================== --- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java (original) +++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java Thu Oct 15 01:06:23 2009 @@ -630,9 +630,9 @@ //around the connection close race during tearDown() causing sporadic failures final CountDownLatch exceptionReceived = new CountDownLatch(1); + Connection conn = getConnection("server", "guest"); try { - Connection conn = getConnection("server", "guest"); conn.setExceptionListener(new ExceptionListener() { @@ -649,7 +649,6 @@ session.createTemporaryQueue(); fail("Test failed as creation succeded."); - //conn will be automatically closed } catch (JMSException e) { @@ -664,6 +663,17 @@ assertTrue("Timed out waiting for conneciton to report close", exceptionReceived.await(2, TimeUnit.SECONDS)); } + finally + { + try + { + conn.close(); + } + catch (Exception e) + { + // This normally fails because we are denied + } + } } public void testServerCreateAutoDeleteQueueInvalid() throws NamingException, JMSException, AMQException, Exception Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java?rev=825362&r1=825361&r2=825362&view=diff ============================================================================== --- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java (original) +++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java Thu Oct 15 01:06:23 2009 @@ -20,27 +20,27 @@ */ package org.apache.qpid.test.unit.client.protocol; -import org.apache.mina.common.IoSession; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.test.utils.QpidTestCase; -import org.apache.qpid.test.utils.protocol.TestIoSession; +import org.apache.qpid.transport.TestNetworkDriver; +import org.apache.qpid.transport.NetworkDriver; public class AMQProtocolSessionTest extends QpidTestCase { private static class AMQProtSession extends AMQProtocolSession { - public AMQProtSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection) + public AMQProtSession(AMQProtocolHandler protocolHandler, AMQConnection connection) { - super(protocolHandler,protocolSession,connection); + super(protocolHandler,connection); } - public TestIoSession getMinaProtocolSession() + public TestNetworkDriver getNetworkDriver() { - return (TestIoSession) _minaProtocolSession; + return (TestNetworkDriver) _protocolHandler.getNetworkDriver(); } public AMQShortString genQueueName() @@ -63,8 +63,11 @@ { super.setUp(); + AMQConnection con = (AMQConnection) getConnection("guest", "guest"); + AMQProtocolHandler protocolHandler = new AMQProtocolHandler(con); + protocolHandler.setNetworkDriver(new TestNetworkDriver()); //don't care about the values set here apart from the dummy IoSession - _testSession = new AMQProtSession(null,new TestIoSession(), (AMQConnection) getConnection("guest", "guest")); + _testSession = new AMQProtSession(protocolHandler , con); //initialise addresses for test and expected results _port = 123; @@ -81,20 +84,20 @@ AMQShortString testAddress; //test address with / and ; chars which generateQueueName should removeKey - _testSession.getMinaProtocolSession().setStringLocalAddress(_brokenAddress); - _testSession.getMinaProtocolSession().setLocalPort(_port); + _testSession.getNetworkDriver().setLocalAddress(_brokenAddress); + _testSession.getNetworkDriver().setPort(_port); testAddress = _testSession.genQueueName(); assertEquals("Failure when generating a queue exchange from an address with special chars",_generatedAddress,testAddress.toString()); //test empty address - _testSession.getMinaProtocolSession().setStringLocalAddress(_emptyAddress); + _testSession.getNetworkDriver().setLocalAddress(_emptyAddress); testAddress = _testSession.genQueueName(); assertEquals("Failure when generating a queue exchange from an empty address",_generatedAddress_2,testAddress.toString()); //test address with no special chars - _testSession.getMinaProtocolSession().setStringLocalAddress(_validAddress); + _testSession.getNetworkDriver().setLocalAddress(_validAddress); testAddress = _testSession.genQueueName(); assertEquals("Failure when generating a queue exchange from an address with no special chars",_generatedAddress_3,testAddress.toString()); Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java?rev=825362&r1=825361&r2=825362&view=diff ============================================================================== --- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java (original) +++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java Thu Oct 15 01:06:23 2009 @@ -960,7 +960,7 @@ return (AMQConnectionFactory) getInitialContext().lookup(factoryName); } - public Connection getConnection() throws Exception + public Connection getConnection() throws JMSException, NamingException { return getConnection("guest", "guest"); } --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org