Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java Fri Oct 10 09:54:36 2014 @@ -119,12 +119,12 @@ public class ConnectionTuneOkBody extend return buf.toString(); } - public static <T> T process(final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) throws IOException + public static void process(final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException { int channelMax = buffer.readUnsignedShort(); long frameMax = EncodingUtils.readUnsignedInteger(buffer); int heartbeat = buffer.readUnsignedShort(); - return dispatcher.connectionTuneOk(channelMax, frameMax, heartbeat); + dispatcher.receiveConnectionTuneOk(channelMax, frameMax, heartbeat); } }
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java Fri Oct 10 09:54:36 2014 @@ -92,14 +92,14 @@ public class ContentBody implements AMQB return _payload; } - public static <T> T process(final int channel, + public static void process(final int channel, final MarkableDataInput in, - final MethodProcessor<T> methodProcessor, final long bodySize) + final MethodProcessor methodProcessor, final long bodySize) throws IOException { byte[] payload = new byte[(int)bodySize]; in.readFully(payload); - return methodProcessor.messageContent(channel, payload); + methodProcessor.receiveMessageContent(channel, payload); } private static class BufferContentBody implements AMQBody Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java Fri Oct 10 09:54:36 2014 @@ -155,9 +155,9 @@ public class ContentHeaderBody implement _bodySize = bodySize; } - public static <T> T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor<T> methodProcessor, final long size) + final MethodProcessor methodProcessor, final long size) throws IOException, AMQFrameDecodingException { @@ -175,6 +175,6 @@ public class ContentHeaderBody implement properties = new BasicContentHeaderProperties(); properties.populatePropertiesFromBuffer(buffer, propertyFlags, (int)(size-14)); - return methodProcessor.messageHeader(channelId, properties, bodySize); + methodProcessor.receiveMessageHeader(channelId, properties, bodySize); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java Fri Oct 10 09:54:36 2014 @@ -122,13 +122,13 @@ public class ExchangeBoundBody extends A return buf.toString(); } - public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) + public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException { AMQShortString exchange = buffer.readAMQShortString(); AMQShortString routingKey = buffer.readAMQShortString(); AMQShortString queue = buffer.readAMQShortString(); - return dispatcher.exchangeBound(channelId, exchange, routingKey, queue); + dispatcher.receiveExchangeBound(channelId, exchange, routingKey, queue); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java Fri Oct 10 09:54:36 2014 @@ -108,12 +108,12 @@ public class ExchangeBoundOkBody extends return buf.toString(); } - public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) + public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException { int replyCode = buffer.readUnsignedShort(); AMQShortString replyText = buffer.readAMQShortString(); - return dispatcher.exchangeBoundOk(channelId, replyCode, replyText); + dispatcher.receiveExchangeBoundOk(channelId, replyCode, replyText); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java Fri Oct 10 09:54:36 2014 @@ -204,9 +204,9 @@ public class ExchangeDeclareBody extends return buf.toString(); } - public static <T> T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor<T> dispatcher) throws IOException, AMQFrameDecodingException + final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException { int ticket = buffer.readUnsignedShort(); @@ -219,6 +219,14 @@ public class ExchangeDeclareBody extends boolean internal = (bitfield & 0x8) == 0x8; boolean nowait = (bitfield & 0x10) == 0x10; FieldTable arguments = EncodingUtils.readFieldTable(buffer); - return dispatcher.exchangeDeclare(channelId, exchange, type, passive, durable, autoDelete, internal, nowait, arguments); + dispatcher.receiveExchangeDeclare(channelId, + exchange, + type, + passive, + durable, + autoDelete, + internal, + nowait, + arguments); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java Fri Oct 10 09:54:36 2014 @@ -138,7 +138,7 @@ public class ExchangeDeleteBody extends return buf.toString(); } - public static <T> T process(final int channelId, final MarkableDataInput buffer, final MethodProcessor<T> dispatcher) + public static void process(final int channelId, final MarkableDataInput buffer, final MethodProcessor dispatcher) throws IOException { @@ -147,6 +147,6 @@ public class ExchangeDeleteBody extends byte bitfield = buffer.readByte(); boolean ifUnused = (bitfield & 0x01) == 0x01; boolean nowait = (bitfield & 0x02) == 0x02; - return dispatcher.exchangeDelete(channelId, exchange, ifUnused, nowait); + dispatcher.receiveExchangeDelete(channelId, exchange, ifUnused, nowait); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java Fri Oct 10 09:54:36 2014 @@ -20,484 +20,506 @@ */ package org.apache.qpid.framing; -public class FrameCreatingMethodProcessor implements MethodProcessor<AMQFrame> +import java.util.ArrayList; +import java.util.List; + +public class FrameCreatingMethodProcessor implements MethodProcessor { - private final MethodRegistry _methodRegistry; + private ProtocolVersion _protocolVersion; + + private final List<AMQDataBlock> _processedMethods = new ArrayList<>(); - FrameCreatingMethodProcessor(final MethodRegistry methodRegistry) + public FrameCreatingMethodProcessor(final ProtocolVersion protocolVersion) { - _methodRegistry = methodRegistry; + _protocolVersion = protocolVersion; } + public List<AMQDataBlock> getProcessedMethods() + { + return _processedMethods; + } + @Override - public AMQFrame connectionStart(final short versionMajor, - final short versionMinor, - final FieldTable serverProperties, - final byte[] mechanisms, - final byte[] locales) + public void receiveConnectionStart(final short versionMajor, + final short versionMinor, + final FieldTable serverProperties, + final byte[] mechanisms, + final byte[] locales) { - return new AMQFrame(0, new ConnectionStartBody(versionMajor, versionMinor, serverProperties, mechanisms, locales)); + _processedMethods.add(new AMQFrame(0, new ConnectionStartBody(versionMajor, versionMinor, serverProperties, mechanisms, locales))); } @Override - public AMQFrame connectionStartOk(final FieldTable clientProperties, - final AMQShortString mechanism, - final byte[] response, - final AMQShortString locale) + public void receiveConnectionStartOk(final FieldTable clientProperties, + final AMQShortString mechanism, + final byte[] response, + final AMQShortString locale) { - return new AMQFrame(0, new ConnectionStartOkBody(clientProperties, mechanism, response, locale)); + _processedMethods.add(new AMQFrame(0, new ConnectionStartOkBody(clientProperties, mechanism, response, locale))); } @Override - public AMQFrame txSelect(final int channelId) + public void receiveTxSelect(final int channelId) { - return new AMQFrame(channelId, TxSelectBody.INSTANCE); + _processedMethods.add(new AMQFrame(channelId, TxSelectBody.INSTANCE)); } @Override - public AMQFrame txSelectOk(final int channelId) + public void receiveTxSelectOk(final int channelId) { - return new AMQFrame(channelId, TxSelectOkBody.INSTANCE); + _processedMethods.add(new AMQFrame(channelId, TxSelectOkBody.INSTANCE)); } @Override - public AMQFrame txCommit(final int channelId) + public void receiveTxCommit(final int channelId) { - return new AMQFrame(channelId, TxCommitBody.INSTANCE); + _processedMethods.add(new AMQFrame(channelId, TxCommitBody.INSTANCE)); } @Override - public AMQFrame txCommitOk(final int channelId) + public void receiveTxCommitOk(final int channelId) { - return new AMQFrame(channelId, TxCommitOkBody.INSTANCE); + _processedMethods.add(new AMQFrame(channelId, TxCommitOkBody.INSTANCE)); } @Override - public AMQFrame txRollback(final int channelId) + public void receiveTxRollback(final int channelId) { - return new AMQFrame(channelId, TxRollbackBody.INSTANCE); + _processedMethods.add(new AMQFrame(channelId, TxRollbackBody.INSTANCE)); } @Override - public AMQFrame txRollbackOk(final int channelId) + public void receiveTxRollbackOk(final int channelId) { - return new AMQFrame(channelId, TxRollbackOkBody.INSTANCE); + _processedMethods.add(new AMQFrame(channelId, TxRollbackOkBody.INSTANCE)); } @Override - public AMQFrame connectionSecure(final byte[] challenge) + public void receiveConnectionSecure(final byte[] challenge) { - return new AMQFrame(0, new ConnectionSecureBody(challenge)); + _processedMethods.add(new AMQFrame(0, new ConnectionSecureBody(challenge))); } @Override - public AMQFrame connectionSecureOk(final byte[] response) + public void receiveConnectionSecureOk(final byte[] response) { - return new AMQFrame(0, new ConnectionSecureOkBody(response)); + _processedMethods.add(new AMQFrame(0, new ConnectionSecureOkBody(response))); } @Override - public AMQFrame connectionTune(final int channelMax, final long frameMax, final int heartbeat) + public void receiveConnectionTune(final int channelMax, final long frameMax, final int heartbeat) { - return new AMQFrame(0, new ConnectionTuneBody(channelMax, frameMax, heartbeat)); + _processedMethods.add(new AMQFrame(0, new ConnectionTuneBody(channelMax, frameMax, heartbeat))); } @Override - public AMQFrame connectionTuneOk(final int channelMax, final long frameMax, final int heartbeat) + public void receiveConnectionTuneOk(final int channelMax, final long frameMax, final int heartbeat) { - return new AMQFrame(0, new ConnectionTuneOkBody(channelMax, frameMax, heartbeat)); + _processedMethods.add(new AMQFrame(0, new ConnectionTuneOkBody(channelMax, frameMax, heartbeat))); } @Override - public AMQFrame connectionOpen(final AMQShortString virtualHost, - final AMQShortString capabilities, - final boolean insist) + public void receiveConnectionOpen(final AMQShortString virtualHost, + final AMQShortString capabilities, + final boolean insist) { - return new AMQFrame(0, new ConnectionOpenBody(virtualHost, capabilities, insist)); + _processedMethods.add(new AMQFrame(0, new ConnectionOpenBody(virtualHost, capabilities, insist))); } @Override - public AMQFrame connectionOpenOk(final AMQShortString knownHosts) + public void receiveConnectionOpenOk(final AMQShortString knownHosts) { - return new AMQFrame(0, new ConnectionOpenOkBody(knownHosts)); + _processedMethods.add(new AMQFrame(0, new ConnectionOpenOkBody(knownHosts))); } @Override - public AMQFrame connectionRedirect(final AMQShortString host, final AMQShortString knownHosts) + public void receiveConnectionRedirect(final AMQShortString host, final AMQShortString knownHosts) { - return new AMQFrame(0, new ConnectionRedirectBody(getProtocolVersion(), host, knownHosts)); + _processedMethods.add(new AMQFrame(0, new ConnectionRedirectBody(getProtocolVersion(), host, knownHosts))); } @Override - public AMQFrame connectionClose(final int replyCode, - final AMQShortString replyText, - final int classId, - final int methodId) + public void receiveConnectionClose(final int replyCode, + final AMQShortString replyText, + final int classId, + final int methodId) { - return new AMQFrame(0, new ConnectionCloseBody(getProtocolVersion(), replyCode, replyText, classId, methodId)); + _processedMethods.add(new AMQFrame(0, new ConnectionCloseBody(getProtocolVersion(), replyCode, replyText, classId, methodId))); } @Override - public AMQFrame connectionCloseOk() + public void receiveConnectionCloseOk() { - return new AMQFrame(0, ProtocolVersion.v8_0.equals(getProtocolVersion()) + _processedMethods.add(new AMQFrame(0, ProtocolVersion.v8_0.equals(getProtocolVersion()) ? ConnectionCloseOkBody.CONNECTION_CLOSE_OK_0_8 - : ConnectionCloseOkBody.CONNECTION_CLOSE_OK_0_9); + : ConnectionCloseOkBody.CONNECTION_CLOSE_OK_0_9)); } @Override - public AMQFrame channelOpen(final int channelId) + public void receiveChannelOpen(final int channelId) { - return new AMQFrame(channelId, new ChannelOpenBody()); + _processedMethods.add(new AMQFrame(channelId, new ChannelOpenBody())); } @Override - public AMQFrame channelOpenOk(final int channelId) + public void receiveChannelOpenOk(final int channelId) { - return new AMQFrame(channelId, ProtocolVersion.v8_0.equals(getProtocolVersion()) + _processedMethods.add(new AMQFrame(channelId, ProtocolVersion.v8_0.equals(getProtocolVersion()) ? ChannelOpenOkBody.INSTANCE_0_8 - : ChannelOpenOkBody.INSTANCE_0_9); + : ChannelOpenOkBody.INSTANCE_0_9)); } @Override - public AMQFrame channelFlow(final int channelId, final boolean active) + public void receiveChannelFlow(final int channelId, final boolean active) { - return new AMQFrame(channelId, new ChannelFlowBody(active)); + _processedMethods.add(new AMQFrame(channelId, new ChannelFlowBody(active))); } @Override - public AMQFrame channelFlowOk(final int channelId, final boolean active) + public void receiveChannelFlowOk(final int channelId, final boolean active) { - return new AMQFrame(channelId, new ChannelFlowOkBody(active)); + _processedMethods.add(new AMQFrame(channelId, new ChannelFlowOkBody(active))); } @Override - public AMQFrame channelAlert(final int channelId, - final int replyCode, - final AMQShortString replyText, - final FieldTable details) + public void receiveChannelAlert(final int channelId, + final int replyCode, + final AMQShortString replyText, + final FieldTable details) { - return new AMQFrame(channelId, new ChannelAlertBody(replyCode, replyText, details)); + _processedMethods.add(new AMQFrame(channelId, new ChannelAlertBody(replyCode, replyText, details))); } @Override - public AMQFrame channelClose(final int channelId, - final int replyCode, - final AMQShortString replyText, - final int classId, - final int methodId) + public void receiveChannelClose(final int channelId, + final int replyCode, + final AMQShortString replyText, + final int classId, + final int methodId) { - return new AMQFrame(channelId, new ChannelCloseBody(replyCode, replyText, classId, methodId)); + _processedMethods.add(new AMQFrame(channelId, new ChannelCloseBody(replyCode, replyText, classId, methodId))); } @Override - public AMQFrame channelCloseOk(final int channelId) + public void receiveChannelCloseOk(final int channelId) { - return new AMQFrame(channelId, ChannelCloseOkBody.INSTANCE); + _processedMethods.add(new AMQFrame(channelId, ChannelCloseOkBody.INSTANCE)); } @Override - public AMQFrame accessRequest(final int channelId, - final AMQShortString realm, - final boolean exclusive, - final boolean passive, - final boolean active, - final boolean write, - final boolean read) + public void receiveAccessRequest(final int channelId, + final AMQShortString realm, + final boolean exclusive, + final boolean passive, + final boolean active, + final boolean write, + final boolean read) { - return new AMQFrame(channelId, new AccessRequestBody(realm, exclusive, passive, active, write, read)); + _processedMethods.add(new AMQFrame(channelId, new AccessRequestBody(realm, exclusive, passive, active, write, read))); } @Override - public AMQFrame accessRequestOk(final int channelId, final int ticket) + public void receiveAccessRequestOk(final int channelId, final int ticket) { - return new AMQFrame(channelId, new AccessRequestOkBody(ticket)); + _processedMethods.add(new AMQFrame(channelId, new AccessRequestOkBody(ticket))); } @Override - public AMQFrame exchangeDeclare(final int channelId, - final AMQShortString exchange, - final AMQShortString type, - final boolean passive, - final boolean durable, - final boolean autoDelete, - final boolean internal, - final boolean nowait, final FieldTable arguments) + public void receiveExchangeDeclare(final int channelId, + final AMQShortString exchange, + final AMQShortString type, + final boolean passive, + final boolean durable, + final boolean autoDelete, + final boolean internal, + final boolean nowait, final FieldTable arguments) { - return new AMQFrame(channelId, new ExchangeDeclareBody(0, exchange, type, passive, durable, autoDelete, internal, nowait, arguments)); + _processedMethods.add(new AMQFrame(channelId, new ExchangeDeclareBody(0, exchange, type, passive, durable, autoDelete, internal, nowait, arguments))); } @Override - public AMQFrame exchangeDeclareOk(final int channelId) + public void receiveExchangeDeclareOk(final int channelId) { - return new AMQFrame(channelId, new ExchangeDeclareOkBody()); + _processedMethods.add(new AMQFrame(channelId, new ExchangeDeclareOkBody())); } @Override - public AMQFrame exchangeDelete(final int channelId, - final AMQShortString exchange, - final boolean ifUnused, - final boolean nowait) + public void receiveExchangeDelete(final int channelId, + final AMQShortString exchange, + final boolean ifUnused, + final boolean nowait) { - return new AMQFrame(channelId, new ExchangeDeleteBody(0, exchange, ifUnused, nowait)); + _processedMethods.add(new AMQFrame(channelId, new ExchangeDeleteBody(0, exchange, ifUnused, nowait))); } @Override - public AMQFrame exchangeDeleteOk(final int channelId) + public void receiveExchangeDeleteOk(final int channelId) { - return new AMQFrame(channelId, new ExchangeDeleteOkBody()); + _processedMethods.add(new AMQFrame(channelId, new ExchangeDeleteOkBody())); } @Override - public AMQFrame exchangeBound(final int channelId, - final AMQShortString exchange, - final AMQShortString routingKey, - final AMQShortString queue) + public void receiveExchangeBound(final int channelId, + final AMQShortString exchange, + final AMQShortString routingKey, + final AMQShortString queue) { - return new AMQFrame(channelId, new ExchangeBoundBody(exchange, routingKey, queue)); + _processedMethods.add(new AMQFrame(channelId, new ExchangeBoundBody(exchange, routingKey, queue))); } @Override - public AMQFrame exchangeBoundOk(final int channelId, final int replyCode, final AMQShortString replyText) + public void receiveExchangeBoundOk(final int channelId, final int replyCode, final AMQShortString replyText) { - return new AMQFrame(channelId, new ExchangeBoundOkBody(replyCode, replyText)); + _processedMethods.add(new AMQFrame(channelId, new ExchangeBoundOkBody(replyCode, replyText))); } @Override - public AMQFrame queueBindOk(final int channelId) + public void receiveQueueBindOk(final int channelId) { - return new AMQFrame(channelId, new QueueBindOkBody()); + _processedMethods.add(new AMQFrame(channelId, new QueueBindOkBody())); } @Override - public AMQFrame queueUnbindOk(final int channelId) + public void receiveQueueUnbindOk(final int channelId) { - return new AMQFrame(channelId, new QueueUnbindOkBody()); + _processedMethods.add(new AMQFrame(channelId, new QueueUnbindOkBody())); } @Override - public AMQFrame queueDeclare(final int channelId, - final AMQShortString queue, - final boolean passive, - final boolean durable, - final boolean exclusive, - final boolean autoDelete, - final boolean nowait, - final FieldTable arguments) + public void receiveQueueDeclare(final int channelId, + final AMQShortString queue, + final boolean passive, + final boolean durable, + final boolean exclusive, + final boolean autoDelete, + final boolean nowait, + final FieldTable arguments) { - return new AMQFrame(channelId, new QueueDeclareBody(0, queue, passive, durable, exclusive, autoDelete, nowait, arguments)); + _processedMethods.add(new AMQFrame(channelId, new QueueDeclareBody(0, queue, passive, durable, exclusive, autoDelete, nowait, arguments))); } @Override - public AMQFrame queueDeclareOk(final int channelId, - final AMQShortString queue, - final long messageCount, - final long consumerCount) + public void receiveQueueDeclareOk(final int channelId, + final AMQShortString queue, + final long messageCount, + final long consumerCount) { - return new AMQFrame(channelId, new QueueDeclareOkBody(queue, messageCount, consumerCount)); + _processedMethods.add(new AMQFrame(channelId, new QueueDeclareOkBody(queue, messageCount, consumerCount))); } @Override - public AMQFrame queueBind(final int channelId, - final AMQShortString queue, - final AMQShortString exchange, - final AMQShortString bindingKey, - final boolean nowait, - final FieldTable arguments) + public void receiveQueueBind(final int channelId, + final AMQShortString queue, + final AMQShortString exchange, + final AMQShortString bindingKey, + final boolean nowait, + final FieldTable arguments) { - return new AMQFrame(channelId, new QueueBindBody(0, queue, exchange, bindingKey, nowait, arguments)); + _processedMethods.add(new AMQFrame(channelId, new QueueBindBody(0, queue, exchange, bindingKey, nowait, arguments))); } @Override - public AMQFrame queuePurge(final int channelId, final AMQShortString queue, final boolean nowait) + public void receiveQueuePurge(final int channelId, final AMQShortString queue, final boolean nowait) { - return new AMQFrame(channelId, new QueuePurgeBody(0, queue, nowait)); + _processedMethods.add(new AMQFrame(channelId, new QueuePurgeBody(0, queue, nowait))); } @Override - public AMQFrame queuePurgeOk(final int channelId, final long messageCount) + public void receiveQueuePurgeOk(final int channelId, final long messageCount) { - return new AMQFrame(channelId, new QueuePurgeOkBody(messageCount)); + _processedMethods.add(new AMQFrame(channelId, new QueuePurgeOkBody(messageCount))); } @Override - public AMQFrame queueDelete(final int channelId, - final AMQShortString queue, - final boolean ifUnused, - final boolean ifEmpty, - final boolean nowait) + public void receiveQueueDelete(final int channelId, + final AMQShortString queue, + final boolean ifUnused, + final boolean ifEmpty, + final boolean nowait) { - return new AMQFrame(channelId, new QueueDeleteBody(0, queue, ifUnused, ifEmpty, nowait)); + _processedMethods.add(new AMQFrame(channelId, new QueueDeleteBody(0, queue, ifUnused, ifEmpty, nowait))); } @Override - public AMQFrame queueDeleteOk(final int channelId, final long messageCount) + public void receiveQueueDeleteOk(final int channelId, final long messageCount) { - return new AMQFrame(channelId, new QueueDeleteOkBody(messageCount)); + _processedMethods.add(new AMQFrame(channelId, new QueueDeleteOkBody(messageCount))); } @Override - public AMQFrame queueUnbind(final int channelId, - final AMQShortString queue, - final AMQShortString exchange, - final AMQShortString bindingKey, - final FieldTable arguments) + public void receiveQueueUnbind(final int channelId, + final AMQShortString queue, + final AMQShortString exchange, + final AMQShortString bindingKey, + final FieldTable arguments) { - return new AMQFrame(channelId, new QueueUnbindBody(0, queue, exchange, bindingKey, arguments)); + _processedMethods.add(new AMQFrame(channelId, new QueueUnbindBody(0, queue, exchange, bindingKey, arguments))); } @Override - public AMQFrame basicRecoverSyncOk(final int channelId) + public void receiveBasicRecoverSyncOk(final int channelId) { - return new AMQFrame(channelId, new BasicRecoverSyncOkBody(getProtocolVersion())); + _processedMethods.add(new AMQFrame(channelId, new BasicRecoverSyncOkBody(getProtocolVersion()))); } @Override - public AMQFrame basicRecover(final int channelId, final boolean requeue, final boolean sync) + public void receiveBasicRecover(final int channelId, final boolean requeue, final boolean sync) { if(ProtocolVersion.v8_0.equals(getProtocolVersion()) || !sync) { - return new AMQFrame(channelId, new BasicRecoverBody(requeue)); + _processedMethods.add(new AMQFrame(channelId, new BasicRecoverBody(requeue))); } else { - return new AMQFrame(channelId, new BasicRecoverSyncBody(getProtocolVersion(), requeue)); + _processedMethods.add(new AMQFrame(channelId, new BasicRecoverSyncBody(getProtocolVersion(), requeue))); } } @Override - public AMQFrame basicQos(final int channelId, - final long prefetchSize, - final int prefetchCount, - final boolean global) + public void receiveBasicQos(final int channelId, + final long prefetchSize, + final int prefetchCount, + final boolean global) { - return new AMQFrame(channelId, new BasicQosBody(prefetchSize, prefetchCount, global)); + _processedMethods.add(new AMQFrame(channelId, new BasicQosBody(prefetchSize, prefetchCount, global))); } @Override - public AMQFrame basicQosOk(final int channelId) + public void receiveBasicQosOk(final int channelId) { - return new AMQFrame(channelId, new BasicQosOkBody()); + _processedMethods.add(new AMQFrame(channelId, new BasicQosOkBody())); } @Override - public AMQFrame basicConsume(final int channelId, - final AMQShortString queue, - final AMQShortString consumerTag, - final boolean noLocal, - final boolean noAck, - final boolean exclusive, - final boolean nowait, - final FieldTable arguments) + public void receiveBasicConsume(final int channelId, + final AMQShortString queue, + final AMQShortString consumerTag, + final boolean noLocal, + final boolean noAck, + final boolean exclusive, + final boolean nowait, + final FieldTable arguments) { - return new AMQFrame(channelId, new BasicConsumeBody(0, queue, consumerTag, noLocal, noAck, exclusive, nowait, arguments)); + _processedMethods.add(new AMQFrame(channelId, new BasicConsumeBody(0, queue, consumerTag, noLocal, noAck, exclusive, nowait, arguments))); } @Override - public AMQFrame basicConsumeOk(final int channelId, final AMQShortString consumerTag) + public void receiveBasicConsumeOk(final int channelId, final AMQShortString consumerTag) { - return new AMQFrame(channelId, new BasicConsumeOkBody(consumerTag)); + _processedMethods.add(new AMQFrame(channelId, new BasicConsumeOkBody(consumerTag))); } @Override - public AMQFrame basicCancel(final int channelId, final AMQShortString consumerTag, final boolean noWait) + public void receiveBasicCancel(final int channelId, final AMQShortString consumerTag, final boolean noWait) { - return new AMQFrame(channelId, new BasicCancelBody(consumerTag, noWait)); + _processedMethods.add(new AMQFrame(channelId, new BasicCancelBody(consumerTag, noWait))); } @Override - public AMQFrame basicCancelOk(final int channelId, final AMQShortString consumerTag) + public void receiveBasicCancelOk(final int channelId, final AMQShortString consumerTag) { - return new AMQFrame(channelId, new BasicCancelOkBody(consumerTag)); + _processedMethods.add(new AMQFrame(channelId, new BasicCancelOkBody(consumerTag))); } @Override - public AMQFrame basicPublish(final int channelId, - final AMQShortString exchange, - final AMQShortString routingKey, - final boolean mandatory, - final boolean immediate) + public void receiveBasicPublish(final int channelId, + final AMQShortString exchange, + final AMQShortString routingKey, + final boolean mandatory, + final boolean immediate) { - return new AMQFrame(channelId, new BasicPublishBody(0, exchange, routingKey, mandatory, immediate)); + _processedMethods.add(new AMQFrame(channelId, new BasicPublishBody(0, exchange, routingKey, mandatory, immediate))); } @Override - public AMQFrame basicReturn(final int channelId, final int replyCode, - final AMQShortString replyText, - final AMQShortString exchange, - final AMQShortString routingKey) + public void receiveBasicReturn(final int channelId, final int replyCode, + final AMQShortString replyText, + final AMQShortString exchange, + final AMQShortString routingKey) { - return new AMQFrame(channelId, new BasicReturnBody(replyCode, replyText, exchange, routingKey)); + _processedMethods.add(new AMQFrame(channelId, new BasicReturnBody(replyCode, replyText, exchange, routingKey))); } @Override - public AMQFrame basicDeliver(final int channelId, - final AMQShortString consumerTag, - final long deliveryTag, - final boolean redelivered, - final AMQShortString exchange, - final AMQShortString routingKey) + public void receiveBasicDeliver(final int channelId, + final AMQShortString consumerTag, + final long deliveryTag, + final boolean redelivered, + final AMQShortString exchange, + final AMQShortString routingKey) { - return new AMQFrame(channelId, new BasicDeliverBody(consumerTag, deliveryTag, redelivered, exchange, routingKey)); + _processedMethods.add(new AMQFrame(channelId, new BasicDeliverBody(consumerTag, deliveryTag, redelivered, exchange, routingKey))); } @Override - public AMQFrame basicGet(final int channelId, final AMQShortString queue, final boolean noAck) + public void receiveBasicGet(final int channelId, final AMQShortString queue, final boolean noAck) { - return new AMQFrame(channelId, new BasicGetBody(0, queue, noAck)); + _processedMethods.add(new AMQFrame(channelId, new BasicGetBody(0, queue, noAck))); } @Override - public AMQFrame basicGetOk(final int channelId, - final long deliveryTag, - final boolean redelivered, - final AMQShortString exchange, - final AMQShortString routingKey, - final long messageCount) + public void receiveBasicGetOk(final int channelId, + final long deliveryTag, + final boolean redelivered, + final AMQShortString exchange, + final AMQShortString routingKey, + final long messageCount) + { + _processedMethods.add(new AMQFrame(channelId, new BasicGetOkBody(deliveryTag, redelivered, exchange, routingKey, messageCount))); + } + + @Override + public void receiveBasicGetEmpty(final int channelId) { - return new AMQFrame(channelId, new BasicGetOkBody(deliveryTag, redelivered, exchange, routingKey, messageCount)); + _processedMethods.add(new AMQFrame(channelId, new BasicGetEmptyBody((AMQShortString)null))); } @Override - public AMQFrame basicGetEmpty(final int channelId) + public void receiveBasicAck(final int channelId, final long deliveryTag, final boolean multiple) { - return new AMQFrame(channelId, new BasicGetEmptyBody((AMQShortString)null)); + _processedMethods.add(new AMQFrame(channelId, new BasicAckBody(deliveryTag, multiple))); } @Override - public AMQFrame basicAck(final int channelId, final long deliveryTag, final boolean multiple) + public void receiveBasicReject(final int channelId, final long deliveryTag, final boolean requeue) { - return new AMQFrame(channelId, new BasicAckBody(deliveryTag, multiple)); + _processedMethods.add(new AMQFrame(channelId, new BasicRejectBody(deliveryTag, requeue))); } @Override - public AMQFrame basicReject(final int channelId, final long deliveryTag, final boolean requeue) + public void receiveHeartbeat() { - return new AMQFrame(channelId, new BasicRejectBody(deliveryTag, requeue)); + _processedMethods.add(new AMQFrame(0, new HeartbeatBody())); } @Override - public AMQFrame heartbeat() + public ProtocolVersion getProtocolVersion() { - return new AMQFrame(0, new HeartbeatBody()); + return _protocolVersion; } - private ProtocolVersion getProtocolVersion() + public void setProtocolVersion(final ProtocolVersion protocolVersion) + { + _protocolVersion = protocolVersion; + } + + @Override + public void receiveMessageContent(final int channelId, final byte[] data) { - return _methodRegistry.getProtocolVersion(); + _processedMethods.add(new AMQFrame(channelId, new ContentBody(data))); } @Override - public AMQFrame messageContent(final int channelId, final byte[] data) + public void receiveMessageHeader(final int channelId, + final BasicContentHeaderProperties properties, + final long bodySize) { - return new AMQFrame(channelId, new ContentBody(data)); + _processedMethods.add(new AMQFrame(channelId, new ContentHeaderBody(properties, bodySize))); } @Override - public AMQFrame messageHeader(final int channelId, - final BasicContentHeaderProperties properties, - final long bodySize) + public void receiveProtocolHeader(final ProtocolInitiation protocolInitiation) { - return new AMQFrame(channelId, new ContentHeaderBody(properties, bodySize)); + _processedMethods.add(protocolInitiation); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java Fri Oct 10 09:54:36 2014 @@ -81,9 +81,9 @@ public class HeartbeatBody implements AM return new AMQFrame(0, this); } - public static <T> T process(final int channel, + public static void process(final int channel, final MarkableDataInput in, - final MethodProcessor<T> processor, + final MethodProcessor processor, final long bodySize) throws IOException { @@ -91,6 +91,6 @@ public class HeartbeatBody implements AM { in.skip(bodySize); } - return processor.heartbeat(); + processor.receiveHeartbeat(); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java Fri Oct 10 09:54:36 2014 @@ -20,178 +20,182 @@ */ package org.apache.qpid.framing; -public interface MethodProcessor<T> +public interface MethodProcessor { - T connectionStart(short versionMajor, - short versionMinor, - FieldTable serverProperties, - byte[] mechanisms, - byte[] locales); + ProtocolVersion getProtocolVersion(); - T connectionStartOk(FieldTable clientProperties, - AMQShortString mechanism, - byte[] response, - AMQShortString locale); + void receiveConnectionStart(short versionMajor, + short versionMinor, + FieldTable serverProperties, + byte[] mechanisms, + byte[] locales); - T txSelect(int channelId); + void receiveConnectionStartOk(FieldTable clientProperties, + AMQShortString mechanism, + byte[] response, + AMQShortString locale); - T txSelectOk(int channelId); + void receiveTxSelect(int channelId); - T txCommit(int channelId); + void receiveTxSelectOk(int channelId); - T txCommitOk(int channelId); + void receiveTxCommit(int channelId); - T txRollback(int channelId); + void receiveTxCommitOk(int channelId); - T txRollbackOk(int channelId); + void receiveTxRollback(int channelId); - T connectionSecure(byte[] challenge); + void receiveTxRollbackOk(int channelId); - T connectionSecureOk(byte[] response); + void receiveConnectionSecure(byte[] challenge); - T connectionTune(int channelMax, long frameMax, int heartbeat); + void receiveConnectionSecureOk(byte[] response); - T connectionTuneOk(int channelMax, long frameMax, int heartbeat); + void receiveConnectionTune(int channelMax, long frameMax, int heartbeat); - T connectionOpen(AMQShortString virtualHost, AMQShortString capabilities, boolean insist); + void receiveConnectionTuneOk(int channelMax, long frameMax, int heartbeat); - T connectionOpenOk(AMQShortString knownHosts); + void receiveConnectionOpen(AMQShortString virtualHost, AMQShortString capabilities, boolean insist); - T connectionRedirect(AMQShortString host, AMQShortString knownHosts); + void receiveConnectionOpenOk(AMQShortString knownHosts); - T connectionClose(int replyCode, AMQShortString replyText, int classId, int methodId); + void receiveConnectionRedirect(AMQShortString host, AMQShortString knownHosts); - T connectionCloseOk(); + void receiveConnectionClose(int replyCode, AMQShortString replyText, int classId, int methodId); - T channelOpen(int channelId); + void receiveConnectionCloseOk(); - T channelOpenOk(int channelId); + void receiveChannelOpen(int channelId); - T channelFlow(int channelId, boolean active); + void receiveChannelOpenOk(int channelId); - T channelFlowOk(int channelId, boolean active); + void receiveChannelFlow(int channelId, boolean active); - T channelAlert(int channelId, int replyCode, final AMQShortString replyText, FieldTable details); + void receiveChannelFlowOk(int channelId, boolean active); - T channelClose(int channelId, int replyCode, AMQShortString replyText, int classId, int methodId); + void receiveChannelAlert(int channelId, int replyCode, final AMQShortString replyText, FieldTable details); - T channelCloseOk(int channelId); + void receiveChannelClose(int channelId, int replyCode, AMQShortString replyText, int classId, int methodId); - T accessRequest(int channelId, - AMQShortString realm, - boolean exclusive, - boolean passive, - boolean active, - boolean write, boolean read); + void receiveChannelCloseOk(int channelId); - T accessRequestOk(int channelId, int ticket); + void receiveAccessRequest(int channelId, + AMQShortString realm, + boolean exclusive, + boolean passive, + boolean active, + boolean write, boolean read); - T exchangeDeclare(int channelId, - AMQShortString exchange, - AMQShortString type, - boolean passive, - boolean durable, - boolean autoDelete, boolean internal, boolean nowait, final FieldTable arguments); + void receiveAccessRequestOk(int channelId, int ticket); - T exchangeDeclareOk(int channelId); + void receiveExchangeDeclare(int channelId, + AMQShortString exchange, + AMQShortString type, + boolean passive, + boolean durable, + boolean autoDelete, boolean internal, boolean nowait, final FieldTable arguments); - T exchangeDelete(int channelId, AMQShortString exchange, boolean ifUnused, boolean nowait); + void receiveExchangeDeclareOk(int channelId); - T exchangeDeleteOk(int channelId); + void receiveExchangeDelete(int channelId, AMQShortString exchange, boolean ifUnused, boolean nowait); - T exchangeBound(int channelId, AMQShortString exchange, AMQShortString routingKey, AMQShortString queue); + void receiveExchangeDeleteOk(int channelId); - T exchangeBoundOk(int channelId, int replyCode, AMQShortString replyText); + void receiveExchangeBound(int channelId, AMQShortString exchange, AMQShortString routingKey, AMQShortString queue); - T queueBindOk(int channelId); + void receiveExchangeBoundOk(int channelId, int replyCode, AMQShortString replyText); - T queueUnbindOk(final int channelId); + void receiveQueueBindOk(int channelId); - T queueDeclare(int channelId, - AMQShortString queue, - boolean passive, - boolean durable, - boolean exclusive, - boolean autoDelete, boolean nowait, FieldTable arguments); + void receiveQueueUnbindOk(final int channelId); - T queueDeclareOk(int channelId, final AMQShortString queue, long messageCount, long consumerCount); + void receiveQueueDeclare(int channelId, + AMQShortString queue, + boolean passive, + boolean durable, + boolean exclusive, + boolean autoDelete, boolean nowait, FieldTable arguments); - T queueBind(int channelId, - AMQShortString queue, - AMQShortString exchange, - AMQShortString bindingKey, - boolean nowait, FieldTable arguments); + void receiveQueueDeclareOk(int channelId, final AMQShortString queue, long messageCount, long consumerCount); - T queuePurge(int channelId, AMQShortString queue, boolean nowait); + void receiveQueueBind(int channelId, + AMQShortString queue, + AMQShortString exchange, + AMQShortString bindingKey, + boolean nowait, FieldTable arguments); - T queuePurgeOk(int channelId, long messageCount); + void receiveQueuePurge(int channelId, AMQShortString queue, boolean nowait); - T queueDelete(int channelId, AMQShortString queue, boolean ifUnused, boolean ifEmpty, boolean nowait); + void receiveQueuePurgeOk(int channelId, long messageCount); - T queueDeleteOk(int channelId, long messageCount); + void receiveQueueDelete(int channelId, AMQShortString queue, boolean ifUnused, boolean ifEmpty, boolean nowait); - T queueUnbind(int channelId, - AMQShortString queue, - AMQShortString exchange, - AMQShortString bindingKey, - FieldTable arguments); + void receiveQueueDeleteOk(int channelId, long messageCount); - T basicRecoverSyncOk(int channelId); + void receiveQueueUnbind(int channelId, + AMQShortString queue, + AMQShortString exchange, + AMQShortString bindingKey, + FieldTable arguments); - T basicRecover(int channelId, final boolean requeue, boolean sync); + void receiveBasicRecoverSyncOk(int channelId); - T basicQos(int channelId, long prefetchSize, int prefetchCount, boolean global); + void receiveBasicRecover(int channelId, final boolean requeue, boolean sync); - T basicQosOk(int channelId); + void receiveBasicQos(int channelId, long prefetchSize, int prefetchCount, boolean global); - T basicConsume(int channelId, - AMQShortString queue, - AMQShortString consumerTag, - boolean noLocal, - boolean noAck, - boolean exclusive, boolean nowait, FieldTable arguments); + void receiveBasicQosOk(int channelId); - T basicConsumeOk(int channelId, AMQShortString consumerTag); + void receiveBasicConsume(int channelId, + AMQShortString queue, + AMQShortString consumerTag, + boolean noLocal, + boolean noAck, + boolean exclusive, boolean nowait, FieldTable arguments); - T basicCancel(int channelId, AMQShortString consumerTag, boolean noWait); + void receiveBasicConsumeOk(int channelId, AMQShortString consumerTag); - T basicCancelOk(int channelId, AMQShortString consumerTag); + void receiveBasicCancel(int channelId, AMQShortString consumerTag, boolean noWait); - T basicPublish(int channelId, - AMQShortString exchange, - AMQShortString routingKey, - boolean mandatory, - boolean immediate); + void receiveBasicCancelOk(int channelId, AMQShortString consumerTag); - T basicReturn(final int channelId, - int replyCode, - AMQShortString replyText, - AMQShortString exchange, - AMQShortString routingKey); + void receiveBasicPublish(int channelId, + AMQShortString exchange, + AMQShortString routingKey, + boolean mandatory, + boolean immediate); - T basicDeliver(int channelId, - AMQShortString consumerTag, - long deliveryTag, - boolean redelivered, - AMQShortString exchange, AMQShortString routingKey); + void receiveBasicReturn(final int channelId, + int replyCode, + AMQShortString replyText, + AMQShortString exchange, + AMQShortString routingKey); - T basicGet(int channelId, AMQShortString queue, boolean noAck); + void receiveBasicDeliver(int channelId, + AMQShortString consumerTag, + long deliveryTag, + boolean redelivered, + AMQShortString exchange, AMQShortString routingKey); - T basicGetOk(int channelId, - long deliveryTag, - boolean redelivered, - AMQShortString exchange, - AMQShortString routingKey, long messageCount); + void receiveBasicGet(int channelId, AMQShortString queue, boolean noAck); - T basicGetEmpty(int channelId); + void receiveBasicGetOk(int channelId, + long deliveryTag, + boolean redelivered, + AMQShortString exchange, + AMQShortString routingKey, long messageCount); - T basicAck(int channelId, long deliveryTag, boolean multiple); + void receiveBasicGetEmpty(int channelId); - T basicReject(int channelId, long deliveryTag, boolean requeue); + void receiveBasicAck(int channelId, long deliveryTag, boolean multiple); - T heartbeat(); + void receiveBasicReject(int channelId, long deliveryTag, boolean requeue); - T messageContent(int channelId, byte[] data); + void receiveHeartbeat(); - T messageHeader(int channelId, BasicContentHeaderProperties properties, long bodySize); + void receiveMessageContent(int channelId, byte[] data); + + void receiveMessageHeader(int channelId, BasicContentHeaderProperties properties, long bodySize); + + void receiveProtocolHeader(ProtocolInitiation protocolInitiation); } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/MethodRegistry.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/MethodRegistry.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/MethodRegistry.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/MethodRegistry.java Fri Oct 10 09:54:36 2014 @@ -31,14 +31,12 @@ package org.apache.qpid.framing; public final class MethodRegistry { - private final FrameCreatingMethodProcessor _methodProcessor; private ProtocolVersion _protocolVersion; public MethodRegistry(ProtocolVersion pv) { _protocolVersion = pv; - _methodProcessor = new FrameCreatingMethodProcessor(this); } public void setProtocolVersion(final ProtocolVersion protocolVersion) @@ -555,10 +553,5 @@ public final class MethodRegistry return _protocolVersion; } - public FrameCreatingMethodProcessor getMethodProcessor() - { - return _methodProcessor; - } - } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueBindBody.java Fri Oct 10 09:54:36 2014 @@ -165,9 +165,9 @@ public class QueueBindBody extends AMQMe return buf.toString(); } - public static <T> T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor<T> dispatcher) throws IOException, AMQFrameDecodingException + final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException { int ticket = buffer.readUnsignedShort(); @@ -176,6 +176,6 @@ public class QueueBindBody extends AMQMe AMQShortString bindingKey = buffer.readAMQShortString(); boolean nowait = (buffer.readByte() & 0x01) == 0x01; FieldTable arguments = EncodingUtils.readFieldTable(buffer); - return dispatcher.queueBind(channelId, queue, exchange, bindingKey, nowait, arguments); + dispatcher.receiveQueueBind(channelId, queue, exchange, bindingKey, nowait, arguments); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java Fri Oct 10 09:54:36 2014 @@ -191,9 +191,9 @@ public class QueueDeclareBody extends AM return buf.toString(); } - public static <T> T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor<T> dispatcher) throws IOException, AMQFrameDecodingException + final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException { int ticket = buffer.readUnsignedShort(); @@ -206,6 +206,6 @@ public class QueueDeclareBody extends AM boolean autoDelete = (bitfield & 0x08 ) == 0x08; boolean nowait = (bitfield & 0x010 ) == 0x010; FieldTable arguments = EncodingUtils.readFieldTable(buffer); - return dispatcher.queueDeclare(channelId, queue, passive, durable, exclusive, autoDelete, nowait, arguments); + dispatcher.receiveQueueDeclare(channelId, queue, passive, durable, exclusive, autoDelete, nowait, arguments); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java Fri Oct 10 09:54:36 2014 @@ -120,13 +120,13 @@ public class QueueDeclareOkBody extends return buf.toString(); } - public static <T> T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor<T> dispatcher) throws IOException + final MethodProcessor dispatcher) throws IOException { AMQShortString queue = buffer.readAMQShortString(); long messageCount = EncodingUtils.readUnsignedInteger(buffer); long consumerCount = EncodingUtils.readUnsignedInteger(buffer); - return dispatcher.queueDeclareOk(channelId, queue, messageCount, consumerCount); + dispatcher.receiveQueueDeclareOk(channelId, queue, messageCount, consumerCount); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java Fri Oct 10 09:54:36 2014 @@ -151,9 +151,9 @@ public class QueueDeleteBody extends AMQ return buf.toString(); } - public static <T> T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor<T> dispatcher) throws IOException + final MethodProcessor dispatcher) throws IOException { int ticket = buffer.readUnsignedShort(); @@ -163,6 +163,6 @@ public class QueueDeleteBody extends AMQ boolean ifUnused = (bitfield & 0x01) == 0x01; boolean ifEmpty = (bitfield & 0x02) == 0x02; boolean nowait = (bitfield & 0x04) == 0x04; - return dispatcher.queueDelete(channelId, queue, ifUnused, ifEmpty, nowait); + dispatcher.receiveQueueDelete(channelId, queue, ifUnused, ifEmpty, nowait); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java Fri Oct 10 09:54:36 2014 @@ -95,11 +95,11 @@ public class QueueDeleteOkBody extends A return buf.toString(); } - public static <T> T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor<T> dispatcher) throws IOException + final MethodProcessor dispatcher) throws IOException { long messageCount = EncodingUtils.readUnsignedInteger(buffer); - return dispatcher.queueDeleteOk(channelId, messageCount); + dispatcher.receiveQueueDeleteOk(channelId, messageCount); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java Fri Oct 10 09:54:36 2014 @@ -125,14 +125,14 @@ public class QueuePurgeBody extends AMQM return buf.toString(); } - public static <T> T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor<T> dispatcher) throws IOException + final MethodProcessor dispatcher) throws IOException { int ticket = buffer.readUnsignedShort(); AMQShortString queue = buffer.readAMQShortString(); boolean nowait = (buffer.readByte() & 0x01) == 0x01; - return dispatcher.queuePurge(channelId, queue, nowait); + dispatcher.receiveQueuePurge(channelId, queue, nowait); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java Fri Oct 10 09:54:36 2014 @@ -95,11 +95,11 @@ public class QueuePurgeOkBody extends AM return buf.toString(); } - public static <T> T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor<T> dispatcher) throws IOException + final MethodProcessor dispatcher) throws IOException { long messageCount = EncodingUtils.readUnsignedInteger(buffer); - return dispatcher.queuePurgeOk(channelId, messageCount); + dispatcher.receiveQueuePurgeOk(channelId, messageCount); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java Fri Oct 10 09:54:36 2014 @@ -147,9 +147,9 @@ public class QueueUnbindBody extends AMQ return buf.toString(); } - public static <T> T process(final int channelId, + public static void process(final int channelId, final MarkableDataInput buffer, - final MethodProcessor<T> dispatcher) throws IOException, AMQFrameDecodingException + final MethodProcessor dispatcher) throws IOException, AMQFrameDecodingException { int ticket = buffer.readUnsignedShort(); @@ -157,6 +157,6 @@ public class QueueUnbindBody extends AMQ AMQShortString exchange = buffer.readAMQShortString(); AMQShortString routingKey = buffer.readAMQShortString(); FieldTable arguments = EncodingUtils.readFieldTable(buffer); - return dispatcher.queueUnbind(channelId, queue, exchange, routingKey, arguments); + dispatcher.receiveQueueUnbind(channelId, queue, exchange, routingKey, arguments); } } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java Fri Oct 10 09:54:36 2014 @@ -25,7 +25,7 @@ import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; +import java.util.List; import junit.framework.TestCase; @@ -33,19 +33,21 @@ import org.apache.qpid.framing.AMQDataBl import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQFrameDecodingException; import org.apache.qpid.framing.AMQProtocolVersionException; +import org.apache.qpid.framing.FrameCreatingMethodProcessor; import org.apache.qpid.framing.HeartbeatBody; -import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.ProtocolVersion; public class AMQDecoderTest extends TestCase { private AMQDecoder _decoder; + private FrameCreatingMethodProcessor _methodProcessor; public void setUp() { - _decoder = new AMQDecoder(false, new MethodRegistry(ProtocolVersion.v0_91)); + _methodProcessor = new FrameCreatingMethodProcessor(ProtocolVersion.v0_91); + _decoder = new AMQDecoder(false, _methodProcessor); } @@ -59,7 +61,8 @@ public class AMQDecoderTest extends Test public void testSingleFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException, IOException { ByteBuffer msg = getHeartbeatBodyBuffer(); - ArrayList<AMQDataBlock> frames = _decoder.decodeBuffer(msg); + _decoder.decodeBuffer(msg); + List<AMQDataBlock> frames = _methodProcessor.getProcessedMethods(); if (frames.get(0) instanceof AMQFrame) { assertEquals(HeartbeatBody.FRAME.getBodyFrame().getFrameType(), ((AMQFrame) frames.get(0)).getBodyFrame().getFrameType()); @@ -79,9 +82,12 @@ public class AMQDecoderTest extends Test msgA.limit(msgaLimit); msg.position(msgbPos); ByteBuffer msgB = msg.slice(); - ArrayList<AMQDataBlock> frames = _decoder.decodeBuffer(msgA); + + _decoder.decodeBuffer(msgA); + List<AMQDataBlock> frames = _methodProcessor.getProcessedMethods(); assertEquals(0, frames.size()); - frames = _decoder.decodeBuffer(msgB); + + _decoder.decodeBuffer(msgB); assertEquals(1, frames.size()); if (frames.get(0) instanceof AMQFrame) { @@ -101,7 +107,8 @@ public class AMQDecoderTest extends Test msg.put(msgA); msg.put(msgB); msg.flip(); - ArrayList<AMQDataBlock> frames = _decoder.decodeBuffer(msg); + _decoder.decodeBuffer(msg); + List<AMQDataBlock> frames = _methodProcessor.getProcessedMethods(); assertEquals(2, frames.size()); for (AMQDataBlock frame : frames) { @@ -138,12 +145,15 @@ public class AMQDecoderTest extends Test sliceB.put(msgC); sliceB.flip(); msgC.limit(limit); - - ArrayList<AMQDataBlock> frames = _decoder.decodeBuffer(sliceA); + + _decoder.decodeBuffer(sliceA); + List<AMQDataBlock> frames = _methodProcessor.getProcessedMethods(); assertEquals(1, frames.size()); - frames = _decoder.decodeBuffer(sliceB); + frames.clear(); + _decoder.decodeBuffer(sliceB); assertEquals(1, frames.size()); - frames = _decoder.decodeBuffer(msgC); + frames.clear(); + _decoder.decodeBuffer(msgC); assertEquals(1, frames.size()); for (AMQDataBlock frame : frames) { Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java?rev=1630745&r1=1630744&r2=1630745&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java Fri Oct 10 09:54:36 2014 @@ -27,7 +27,6 @@ import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -41,6 +40,7 @@ import javax.security.sasl.Sasl; import javax.security.sasl.SaslClient; import javax.security.sasl.SaslException; +import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQDataBlockDecoder; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQFrameDecodingException; @@ -51,7 +51,7 @@ import org.apache.qpid.framing.Connectio import org.apache.qpid.framing.ConnectionStartOkBody; import org.apache.qpid.framing.ConnectionTuneOkBody; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.framing.FrameCreatingMethodProcessor; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.server.model.AuthenticationProvider; @@ -110,11 +110,11 @@ public class MaxFrameSizeTest extends Qp { @Override - public void evaluate(final Socket socket, final List<AMQFrame> frames) + public void evaluate(final Socket socket, final List<AMQDataBlock> frames) { if(!socket.isClosed()) { - AMQFrame lastFrame = frames.get(frames.size() - 1); + AMQFrame lastFrame = (AMQFrame) frames.get(frames.size() - 1); assertTrue("Connection should not be possible with a frame size < " + Constant.MIN_MAX_FRAME_SIZE, lastFrame.getBodyFrame() instanceof ConnectionCloseBody); } } @@ -159,11 +159,11 @@ public class MaxFrameSizeTest extends Qp { @Override - public void evaluate(final Socket socket, final List<AMQFrame> frames) + public void evaluate(final Socket socket, final List<AMQDataBlock> frames) { if(!socket.isClosed()) { - AMQFrame lastFrame = frames.get(frames.size() - 1); + AMQFrame lastFrame = (AMQFrame) frames.get(frames.size() - 1); assertTrue("Connection should not be possible with a frame size larger than the broker requested", lastFrame.getBodyFrame() instanceof ConnectionCloseBody); } } @@ -173,7 +173,7 @@ public class MaxFrameSizeTest extends Qp private static interface ResultEvaluator { - void evaluate(Socket socket, List<AMQFrame> frames); + void evaluate(Socket socket, List<AMQDataBlock> frames); } private void doAMQP08test(int frameSize, ResultEvaluator evaluator) @@ -236,17 +236,14 @@ public class MaxFrameSizeTest extends Qp byte[] serverData = baos.toByteArray(); ByteArrayDataInput badi = new ByteArrayDataInput(serverData); AMQDataBlockDecoder datablockDecoder = new AMQDataBlockDecoder(); - final MethodRegistry methodRegistry_0_91 = new MethodRegistry(ProtocolVersion.v0_91); + final FrameCreatingMethodProcessor methodProcessor = new FrameCreatingMethodProcessor(ProtocolVersion.v0_91); - List<AMQFrame> frames = new ArrayList<>(); while (datablockDecoder.decodable(badi)) { - frames.add(datablockDecoder.createAndPopulateFrame(methodRegistry_0_91.getProtocolVersion(), - methodRegistry_0_91.getMethodProcessor(), - badi)); + datablockDecoder.processInput(methodProcessor, badi); } - evaluator.evaluate(socket, frames); + evaluator.evaluate(socket, methodProcessor.getProcessedMethods()); } private static class TestClientDelegate extends ClientDelegate --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org