Modified: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java?rev=810110&r1=810109&r2=810110&view=diff ============================================================================== --- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java (original) +++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java Tue Sep 1 16:27:52 2009 @@ -20,14 +20,21 @@ */ package org.apache.qpid.codec; +import java.util.ArrayList; + import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.IoSession; import org.apache.mina.common.SimpleByteBufferAllocator; import org.apache.mina.filter.codec.CumulativeProtocolDecoder; import org.apache.mina.filter.codec.ProtocolDecoderOutput; +import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQDataBlockDecoder; +import org.apache.qpid.framing.AMQFrameDecodingException; +import org.apache.qpid.framing.AMQMethodBodyFactory; +import org.apache.qpid.framing.AMQProtocolVersionException; import org.apache.qpid.framing.ProtocolInitiation; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; /** * AMQDecoder delegates the decoding of AMQP either to a data block decoder, or in the case of new connections, to a @@ -62,14 +69,19 @@ private boolean _expectProtocolInitiation; private boolean firstDecode = true; + private AMQMethodBodyFactory _bodyFactory; + + private ByteBuffer _remainingBuf; + /** * Creates a new AMQP decoder. * * @param expectProtocolInitiation <tt>true</tt> if this decoder needs to handle protocol initiation. */ - public AMQDecoder(boolean expectProtocolInitiation) + public AMQDecoder(boolean expectProtocolInitiation, AMQVersionAwareProtocolSession session) { _expectProtocolInitiation = expectProtocolInitiation; + _bodyFactory = new AMQMethodBodyFactory(session); } /** @@ -120,7 +132,7 @@ protected boolean doDecodeDataBlock(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception { int pos = in.position(); - boolean enoughData = _dataBlockDecoder.decodable(session, in); + boolean enoughData = _dataBlockDecoder.decodable(in.buf()); in.position(pos); if (!enoughData) { @@ -149,7 +161,7 @@ */ private boolean doDecodePI(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception { - boolean enoughData = _piDecoder.decodable(session, in); + boolean enoughData = _piDecoder.decodable(in.buf()); if (!enoughData) { // returning false means it will leave the contents in the buffer and @@ -158,7 +170,8 @@ } else { - _piDecoder.decode(session, in, out); + ProtocolInitiation pi = new ProtocolInitiation(in.buf()); + out.write(pi); return true; } @@ -177,7 +190,7 @@ } - /** + /** * Cumulates content of <tt>in</tt> into internal buffer and forwards * decoding request to {...@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}. * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt> @@ -268,4 +281,60 @@ session.setAttribute( BUFFER, remainingBuf ); } + public ArrayList<AMQDataBlock> decodeBuffer(java.nio.ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException + { + + // get prior remaining data from accumulator + ArrayList<AMQDataBlock> dataBlocks = new ArrayList<AMQDataBlock>(); + ByteBuffer msg; + // if we have a session buffer, append data to that otherwise + // use the buffer read from the network directly + if( _remainingBuf != null ) + { + _remainingBuf.put(buf); + _remainingBuf.flip(); + msg = _remainingBuf; + } + else + { + msg = ByteBuffer.wrap(buf); + } + + if (_expectProtocolInitiation + || (firstDecode + && (msg.remaining() > 0) + && (msg.get(msg.position()) == (byte)'A'))) + { + if (_piDecoder.decodable(msg.buf())) + { + dataBlocks.add(new ProtocolInitiation(msg.buf())); + } + } + else + { + boolean enoughData = true; + while (enoughData) + { + int pos = msg.position(); + + enoughData = _dataBlockDecoder.decodable(msg); + msg.position(pos); + if (enoughData) + { + dataBlocks.add(_dataBlockDecoder.createAndPopulateFrame(_bodyFactory, msg)); + } + else + { + _remainingBuf = SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate(msg.remaining(), false); + _remainingBuf.setAutoExpand(true); + _remainingBuf.put(msg); + } + } + } + if(firstDecode && dataBlocks.size() > 0) + { + firstDecode = false; + } + return dataBlocks; + } }
Modified: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java?rev=810110&r1=810109&r2=810110&view=diff ============================================================================== --- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java (original) +++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java Tue Sep 1 16:27:52 2009 @@ -47,7 +47,7 @@ public AMQDataBlockDecoder() { } - public boolean decodable(IoSession session, ByteBuffer in) throws AMQFrameDecodingException + public boolean decodable(java.nio.ByteBuffer in) throws AMQFrameDecodingException { final int remainingAfterAttributes = in.remaining() - (1 + 2 + 4 + 1); // type, channel, body length and end byte @@ -56,14 +56,15 @@ return false; } - in.skip(1 + 2); - final long bodySize = in.getUnsignedInt(); + in.position(in.position() + 1 + 2); + // Get an unsigned int, lifted from MINA ByteBuffer getUnsignedInt() + final long bodySize = in.getInt() & 0xffffffffL; return (remainingAfterAttributes >= bodySize); } - protected Object createAndPopulateFrame(IoSession session, ByteBuffer in) + public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, ByteBuffer in) throws AMQFrameDecodingException, AMQProtocolVersionException { final byte type = in.get(); @@ -71,15 +72,7 @@ BodyFactory bodyFactory; if (type == AMQMethodBody.TYPE) { - bodyFactory = (BodyFactory) session.getAttribute(SESSION_METHOD_BODY_FACTORY); - if (bodyFactory == null) - { - AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment(); - bodyFactory = new AMQMethodBodyFactory(protocolSession); - session.setAttribute(SESSION_METHOD_BODY_FACTORY, bodyFactory); - - } - + bodyFactory = methodBodyFactory; } else { @@ -115,6 +108,24 @@ public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception { - out.write(createAndPopulateFrame(session, in)); + AMQMethodBodyFactory bodyFactory = (AMQMethodBodyFactory) session.getAttribute(SESSION_METHOD_BODY_FACTORY); + if (bodyFactory == null) + { + AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment(); + bodyFactory = new AMQMethodBodyFactory(protocolSession); + session.setAttribute(SESSION_METHOD_BODY_FACTORY, bodyFactory); + } + + out.write(createAndPopulateFrame(bodyFactory, in)); + } + + public boolean decodable(ByteBuffer msg) throws AMQFrameDecodingException + { + return decodable(msg.buf()); + } + + public AMQDataBlock createAndPopulateFrame(AMQMethodBodyFactory factory, java.nio.ByteBuffer msg) throws AMQProtocolVersionException, AMQFrameDecodingException + { + return createAndPopulateFrame(factory, ByteBuffer.wrap(msg)); } } Modified: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java?rev=810110&r1=810109&r2=810110&view=diff ============================================================================== --- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java (original) +++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java Tue Sep 1 16:27:52 2009 @@ -50,7 +50,7 @@ { _logger.debug("Encoded frame byte-buffer is '" + EncodingUtils.convertToHexString(buffer) + "'"); } - + out.write(buffer); } Modified: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java?rev=810110&r1=810109&r2=810110&view=diff ============================================================================== --- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java (original) +++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java Tue Sep 1 16:27:52 2009 @@ -20,12 +20,10 @@ */ package org.apache.qpid.framing; -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.IoSession; -import org.apache.mina.filter.codec.ProtocolDecoderOutput; import org.apache.qpid.AMQException; import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQDataBlock { @@ -53,13 +51,12 @@ _protocolMajor = protocolMajor; _protocolMinor = protocolMinor; } - + public ProtocolInitiation(ProtocolVersion pv) { this(AMQP_HEADER, CURRENT_PROTOCOL_CLASS, TCP_PROTOCOL_INSTANCE, pv.getMajorVersion(), pv.getMinorVersion()); } - public ProtocolInitiation(ByteBuffer in) { _protocolHeader = new byte[4]; @@ -71,6 +68,11 @@ _protocolMinor = in.get(); } + public void writePayload(org.apache.mina.common.ByteBuffer buffer) + { + writePayload(buffer.buf()); + } + public long getSize() { return 4 + 1 + 1 + 1 + 1; @@ -127,16 +129,11 @@ * @return true if we have enough data to decode the PI frame fully, false if more * data is required */ - public boolean decodable(IoSession session, ByteBuffer in) + public boolean decodable(ByteBuffer in) { return (in.remaining() >= 8); } - public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) - { - ProtocolInitiation pi = new ProtocolInitiation(in); - out.write(pi); - } } public ProtocolVersion checkVersion() throws AMQException @@ -192,4 +189,5 @@ buffer.append(Integer.toHexString(_protocolMinor)); return buffer.toString(); } + } Modified: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java?rev=810110&r1=810109&r2=810110&view=diff ============================================================================== --- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java (original) +++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java Tue Sep 1 16:27:52 2009 @@ -45,21 +45,31 @@ * a continuation. Job is also a continuation, as is the job completion handler. Or, as Event is totally abstract, * it is really an interface, so could just drop it and use the continuation interface instead. */ -public abstract class Event +public class Event { + private Runnable _runner; + + public Event() + { + + } + /** * Creates a continuation. */ - public Event() - { } + public Event(Runnable runner) + { + _runner = runner; + } /** - * Processes the continuation in the context of a Mina session. - * - * @param session The Mina session. + * Processes the continuation */ - public abstract void process(IoSession session); - + public void process() + { + _runner.run(); + } + /** * A continuation ({...@link Event}) that takes a Mina messageReceived event, and passes it to a NextFilter. * @@ -68,22 +78,22 @@ * <tr><td> Pass a Mina messageReceived event to a NextFilter. <td> {...@link IoFilter.NextFilter}, {...@link IoSession} * </table> */ - public static final class ReceivedEvent extends Event + public static final class MinaReceivedEvent extends Event { private final Object _data; - private final IoFilter.NextFilter _nextFilter; + private final IoSession _session; - public ReceivedEvent(final IoFilter.NextFilter nextFilter, final Object data) + public MinaReceivedEvent(final IoFilter.NextFilter nextFilter, final Object data, final IoSession session) { - super(); _nextFilter = nextFilter; _data = data; + _session = session; } - public void process(IoSession session) + public void process() { - _nextFilter.messageReceived(session, _data); + _nextFilter.messageReceived(_session, _data); } public IoFilter.NextFilter getNextFilter() @@ -101,21 +111,22 @@ * <td> {...@link IoFilter.NextFilter}, {...@link IoFilter.WriteRequest}, {...@link IoSession} * </table> */ - public static final class WriteEvent extends Event + public static final class MinaWriteEvent extends Event { private final IoFilter.WriteRequest _data; private final IoFilter.NextFilter _nextFilter; + private IoSession _session; - public WriteEvent(final IoFilter.NextFilter nextFilter, final IoFilter.WriteRequest data) + public MinaWriteEvent(final IoFilter.NextFilter nextFilter, final IoFilter.WriteRequest data, final IoSession session) { - super(); _nextFilter = nextFilter; _data = data; + _session = session; } - public void process(IoSession session) + public void process() { - _nextFilter.filterWrite(session, _data); + _nextFilter.filterWrite(_session, _data); } public IoFilter.NextFilter getNextFilter() @@ -135,16 +146,17 @@ public static final class CloseEvent extends Event { private final IoFilter.NextFilter _nextFilter; + private final IoSession _session; - public CloseEvent(final IoFilter.NextFilter nextFilter) + public CloseEvent(final IoFilter.NextFilter nextFilter, final IoSession session) { - super(); _nextFilter = nextFilter; + _session = session; } - public void process(IoSession session) + public void process() { - _nextFilter.sessionClosed(session); + _nextFilter.sessionClosed(_session); } public IoFilter.NextFilter getNextFilter() Modified: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java?rev=810110&r1=810109&r2=810110&view=diff ============================================================================== --- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java (original) +++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java Tue Sep 1 16:27:52 2009 @@ -55,9 +55,6 @@ /** The maximum number of events to process per run of the job. More events than this may be queued in the job. */ private final int _maxEvents; - /** The Mina session. */ - private final IoSession _session; - /** Holds the queue of events that make up the job. */ private final java.util.Queue<Event> _eventQueue = new ConcurrentLinkedQueue<Event>(); @@ -79,7 +76,13 @@ */ Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents, final boolean readJob) { - _session = session; + _completionHandler = completionHandler; + _maxEvents = maxEvents; + _readJob = readJob; + } + + public Job(JobCompletionHandler completionHandler, int maxEvents, boolean readJob) + { _completionHandler = completionHandler; _maxEvents = maxEvents; _readJob = readJob; @@ -90,7 +93,7 @@ * * @param evt The continuation to enqueue. */ - void add(Event evt) + public void add(Event evt) { _eventQueue.add(evt); } @@ -111,7 +114,7 @@ } else { - e.process(_session); + e.process(); } } return false; @@ -153,30 +156,19 @@ if(processAll()) { deactivate(); - _completionHandler.completed(_session, this); + _completionHandler.completed(this); } else { - _completionHandler.notCompleted(_session, this); + _completionHandler.notCompleted(this); } } - public boolean isReadJob() - { - return _readJob; - } - public boolean isRead() { return _readJob; } - public boolean isWrite() - { - return !_readJob; - } - - /** * Another interface for a continuation. * @@ -185,8 +177,8 @@ */ static interface JobCompletionHandler { - public void completed(IoSession session, Job job); + public void completed(Job job); - public void notCompleted(final IoSession session, final Job job); + public void notCompleted(final Job job); } } Modified: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java?rev=810110&r1=810109&r2=810110&view=diff ============================================================================== --- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java (original) +++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java Tue Sep 1 16:27:52 2009 @@ -20,19 +20,17 @@ */ package org.apache.qpid.pool; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; + import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoFilterAdapter; import org.apache.mina.common.IoSession; import org.apache.qpid.pool.Event.CloseEvent; - +import org.apache.qpid.pool.Event.MinaReceivedEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ExecutorService; - /** * PoolingFilter, is a no-op pass through filter that hands all events down the Mina filter chain by default. As it * adds no behaviour by default to the filter chain, it is abstract. @@ -74,7 +72,7 @@ private final String _name; /** Defines the maximum number of events that will be batched into a single job. */ - static final int MAX_JOB_EVENTS = Integer.getInteger("amqj.server.read_write_pool.max_events", 10); + public static final int MAX_JOB_EVENTS = Integer.getInteger("amqj.server.read_write_pool.max_events", 10); private final int _maxEvents; @@ -188,7 +186,7 @@ Job job = new Job(session, this, MAX_JOB_EVENTS,_readFilter); session.setAttribute(_name, job); } - + /** * Retrieves this filters Job, by this filters name, from the Mina session. * @@ -208,7 +206,7 @@ * @param session The Mina session to work in. * @param job The job that completed. */ - public void completed(IoSession session, Job job) + public void completed(Job job) { @@ -239,7 +237,7 @@ } } - public void notCompleted(IoSession session, Job job) + public void notCompleted(Job job) { final ExecutorService pool = _poolReference.getPool(); @@ -430,7 +428,7 @@ public void messageReceived(NextFilter nextFilter, final IoSession session, Object message) { Job job = getJobForSession(session); - fireAsynchEvent(job, new Event.ReceivedEvent(nextFilter, message)); + fireAsynchEvent(job, new MinaReceivedEvent(nextFilter, message, session)); } /** @@ -442,7 +440,7 @@ public void sessionClosed(final NextFilter nextFilter, final IoSession session) { Job job = getJobForSession(session); - fireAsynchEvent(job, new CloseEvent(nextFilter)); + fireAsynchEvent(job, new CloseEvent(nextFilter, session)); } } @@ -473,7 +471,7 @@ public void filterWrite(final NextFilter nextFilter, final IoSession session, final WriteRequest writeRequest) { Job job = getJobForSession(session); - fireAsynchEvent(job, new Event.WriteEvent(nextFilter, writeRequest)); + fireAsynchEvent(job, new Event.MinaWriteEvent(nextFilter, writeRequest, session)); } /** @@ -485,7 +483,8 @@ public void sessionClosed(final NextFilter nextFilter, final IoSession session) { Job job = getJobForSession(session); - fireAsynchEvent(job, new CloseEvent(nextFilter)); + fireAsynchEvent(job, new CloseEvent(nextFilter, session)); } } + } Modified: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java?rev=810110&r1=810109&r2=810110&view=diff ============================================================================== --- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java (original) +++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java Tue Sep 1 16:27:52 2009 @@ -23,5 +23,4 @@ public interface ReadWriteRunnable extends Runnable { boolean isRead(); - boolean isWrite(); } Modified: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java?rev=810110&r1=810109&r2=810110&view=diff ============================================================================== --- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java (original) +++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java Tue Sep 1 16:27:52 2009 @@ -20,10 +20,12 @@ */ package org.apache.qpid.protocol; +import org.apache.qpid.transport.NetworkDriver; + public interface ProtocolEngineFactory { // Returns a new instance of a ProtocolEngine - ProtocolEngine newProtocolEngine(); + ProtocolEngine newProtocolEngine(NetworkDriver networkDriver); } \ No newline at end of file Modified: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java?rev=810110&r1=810109&r2=810110&view=diff ============================================================================== --- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java (original) +++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java Tue Sep 1 16:27:52 2009 @@ -28,17 +28,17 @@ public interface NetworkDriverConfiguration { // Taken from Socket - boolean getKeepAlive(); - boolean getOOBInline(); - boolean getReuseAddress(); + Boolean getKeepAlive(); + Boolean getOOBInline(); + Boolean getReuseAddress(); Integer getSoLinger(); // null means off - int getSoTimeout(); - boolean getTcpNoDelay(); - int getTrafficClass(); + Integer getSoTimeout(); + Boolean getTcpNoDelay(); + Integer getTrafficClass(); // The amount of memory in bytes to allocate to the incoming buffer - int getReceiveBufferSize(); + Integer getReceiveBufferSize(); // The amount of memory in bytes to allocate to the outgoing buffer - int getSendBufferSize(); + Integer getSendBufferSize(); } Modified: qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java?rev=810110&r1=810109&r2=810110&view=diff ============================================================================== --- qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java (original) +++ qpid/branches/java-network-refactor/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java Tue Sep 1 16:27:52 2009 @@ -181,6 +181,7 @@ { return _ioSession.getLocalAddress(); } + public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config, SSLEngine sslEngine) throws OpenException @@ -251,6 +252,10 @@ public void close() { + if (_lastWriteFuture != null) + { + _lastWriteFuture.join(); + } if (_acceptor != null) { _acceptor.unbindAll(); @@ -359,9 +364,14 @@ protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder"); } - + + if (_ioSession == null) + { + _ioSession = protocolSession; + } + // Set up the protocol engine - ProtocolEngine protocolEngine = _factory.newProtocolEngine(); + ProtocolEngine protocolEngine = _factory.newProtocolEngine(this); MINANetworkDriver newDriver = new MINANetworkDriver(_useNIO, _processors, _executorPool, _protectIO, protocolEngine, protocolSession); protocolEngine.setNetworkDriver(newDriver); protocolSession.setAttachment(protocolEngine); @@ -385,4 +395,10 @@ return _protocolEngine; } + public void setProtocolEngineFactory(ProtocolEngineFactory engineFactory, boolean acceptingConnections) + { + _factory = engineFactory; + _acceptingConnections = acceptingConnections; + } + } Added: qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java?rev=810110&view=auto ============================================================================== --- qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java (added) +++ qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java Tue Sep 1 16:27:52 2009 @@ -0,0 +1,130 @@ +package org.apache.qpid.codec; + +import java.nio.ByteBuffer; +import java.util.ArrayList; + +import junit.framework.TestCase; + +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQFrameDecodingException; +import org.apache.qpid.framing.AMQProtocolVersionException; +import org.apache.qpid.framing.HeartbeatBody; + +public class AMQDecoderTest extends TestCase +{ + + private AMQCodecFactory _factory; + private AMQDecoder _decoder; + + + public void setUp() + { + _factory = new AMQCodecFactory(false, null); + _decoder = _factory.getDecoder(); + } + + + public void testSingleFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException + { + ByteBuffer msg = HeartbeatBody.FRAME.toNioByteBuffer(); + ArrayList<AMQDataBlock> frames = _decoder.decodeBuffer(msg); + if (frames.get(0) instanceof AMQFrame) + { + assertEquals(HeartbeatBody.FRAME.getBodyFrame().getFrameType(), ((AMQFrame) frames.get(0)).getBodyFrame().getFrameType()); + } + else + { + fail("decode was not a frame"); + } + } + + public void testPartialFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException + { + ByteBuffer msg = HeartbeatBody.FRAME.toNioByteBuffer(); + ByteBuffer msgA = msg.slice(); + int msgbPos = msg.remaining() / 2; + int msgaLimit = msg.remaining() - msgbPos; + msgA.limit(msgaLimit); + msg.position(msgbPos); + ByteBuffer msgB = msg.slice(); + ArrayList<AMQDataBlock> frames = _decoder.decodeBuffer(msgA); + assertEquals(0, frames.size()); + frames = _decoder.decodeBuffer(msgB); + assertEquals(1, frames.size()); + if (frames.get(0) instanceof AMQFrame) + { + assertEquals(HeartbeatBody.FRAME.getBodyFrame().getFrameType(), ((AMQFrame) frames.get(0)).getBodyFrame().getFrameType()); + } + else + { + fail("decode was not a frame"); + } + } + + public void testMultipleFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException + { + ByteBuffer msgA = HeartbeatBody.FRAME.toNioByteBuffer(); + ByteBuffer msgB = HeartbeatBody.FRAME.toNioByteBuffer(); + ByteBuffer msg = ByteBuffer.allocate(msgA.remaining() + msgB.remaining()); + msg.put(msgA); + msg.put(msgB); + msg.flip(); + ArrayList<AMQDataBlock> frames = _decoder.decodeBuffer(msg); + assertEquals(2, frames.size()); + for (AMQDataBlock frame : frames) + { + if (frame instanceof AMQFrame) + { + assertEquals(HeartbeatBody.FRAME.getBodyFrame().getFrameType(), ((AMQFrame) frame).getBodyFrame().getFrameType()); + } + else + { + fail("decode was not a frame"); + } + } + } + + public void testMultiplePartialFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException + { + ByteBuffer msgA = HeartbeatBody.FRAME.toNioByteBuffer(); + ByteBuffer msgB = HeartbeatBody.FRAME.toNioByteBuffer(); + ByteBuffer msgC = HeartbeatBody.FRAME.toNioByteBuffer(); + + ByteBuffer sliceA = ByteBuffer.allocate(msgA.remaining() + msgB.remaining() / 2); + sliceA.put(msgA); + int limit = msgB.limit(); + int pos = msgB.remaining() / 2; + msgB.limit(pos); + sliceA.put(msgB); + sliceA.flip(); + msgB.limit(limit); + msgB.position(pos); + + ByteBuffer sliceB = ByteBuffer.allocate(msgB.remaining() + pos); + sliceB.put(msgB); + msgC.limit(pos); + sliceB.put(msgC); + sliceB.flip(); + msgC.limit(limit); + + ArrayList<AMQDataBlock> frames = _decoder.decodeBuffer(sliceA); + assertEquals(1, frames.size()); + frames = _decoder.decodeBuffer(sliceB); + assertEquals(1, frames.size()); + frames = _decoder.decodeBuffer(msgC); + assertEquals(1, frames.size()); + for (AMQDataBlock frame : frames) + { + if (frame instanceof AMQFrame) + { + assertEquals(HeartbeatBody.FRAME.getBodyFrame().getFrameType(), ((AMQFrame) frame).getBodyFrame().getFrameType()); + } + else + { + fail("decode was not a frame"); + } + } + } + +} Added: qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/codec/MockAMQVersionAwareProtocolSession.java URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/codec/MockAMQVersionAwareProtocolSession.java?rev=810110&view=auto ============================================================================== --- qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/codec/MockAMQVersionAwareProtocolSession.java (added) +++ qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/codec/MockAMQVersionAwareProtocolSession.java Tue Sep 1 16:27:52 2009 @@ -0,0 +1,95 @@ +package org.apache.qpid.codec; + +import java.nio.ByteBuffer; + +import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.HeartbeatBody; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.transport.Sender; + +public class MockAMQVersionAwareProtocolSession implements AMQVersionAwareProtocolSession +{ + + @Override + public void contentBodyReceived(int channelId, ContentBody body) throws AMQException + { + // TODO Auto-generated method stub + + } + + @Override + public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException + { + // TODO Auto-generated method stub + + } + + @Override + public MethodRegistry getMethodRegistry() + { + return MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9); + } + + @Override + public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException + { + // TODO Auto-generated method stub + + } + + @Override + public void init() + { + // TODO Auto-generated method stub + + } + + @Override + public void methodFrameReceived(int channelId, AMQMethodBody body) throws AMQException + { + // TODO Auto-generated method stub + + } + + @Override + public void setSender(Sender<ByteBuffer> sender) + { + // TODO Auto-generated method stub + + } + + @Override + public void writeFrame(AMQDataBlock frame) + { + // TODO Auto-generated method stub + + } + + @Override + public byte getProtocolMajorVersion() + { + // TODO Auto-generated method stub + return 0; + } + + @Override + public byte getProtocolMinorVersion() + { + // TODO Auto-generated method stub + return 0; + } + + @Override + public ProtocolVersion getProtocolVersion() + { + // TODO Auto-generated method stub + return null; + } + +} Modified: qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java?rev=810110&r1=810109&r2=810110&view=diff ============================================================================== --- qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java (original) +++ qpid/branches/java-network-refactor/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java Tue Sep 1 16:27:52 2009 @@ -299,7 +299,7 @@ _countingEngine.setNewLatch(TEST_DATA.getBytes().length); _client.send(ByteBuffer.wrap(TEST_DATA.getBytes())); _countingEngine.getExceptionLatch().await(2, TimeUnit.SECONDS); - assertEquals("Exception should not been thrown", 0, + assertEquals("Exception should have been thrown", 0, _countingEngine.getExceptionLatch().getCount()); } @@ -321,11 +321,12 @@ { EchoProtocolEngine _engine = null; - public ProtocolEngine newProtocolEngine() + public ProtocolEngine newProtocolEngine(NetworkDriver driver) { if (_engine == null) { _engine = new EchoProtocolEngine(); + _engine.setNetworkDriver(driver); } return getEngine(); } --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org