Author: rgodfrey Date: Mon Oct 13 10:50:37 2014 New Revision: 1631344 URL: http://svn.apache.org/r1631344 Log: Add logging
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1631344&r1=1631343&r2=1631344&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Mon Oct 13 10:50:37 2014 @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.protocol.v0_8; +import static org.apache.qpid.transport.util.Functions.hex; + import java.nio.ByteBuffer; import java.security.AccessControlException; import java.security.PrivilegedAction; @@ -1855,6 +1857,15 @@ public class AMQChannel final boolean passive, final boolean active, final boolean write, final boolean read) { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] AccessRequest[" +" realm: " + realm + + " exclusive: " + exclusive + + " passive: " + passive + + " active: " + active + + " write: " + write + " read: " + read + " ]"); + } + MethodRegistry methodRegistry = _connection.getMethodRegistry(); if (ProtocolVersion.v0_91.equals(_connection.getProtocolVersion())) @@ -1876,12 +1887,23 @@ public class AMQChannel @Override public void receiveBasicAck(final long deliveryTag, final boolean multiple) { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] BasicAck[" +" deliveryTag: " + deliveryTag + " multiple: " + multiple + " ]"); + } + acknowledgeMessage(deliveryTag, multiple); } @Override public void receiveBasicCancel(final AMQShortString consumerTag, final boolean nowait) { + + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] BasicCancel[" +" consumerTag: " + consumerTag + " noWait: " + nowait + " ]"); + } + unsubscribeConsumer(consumerTag); if (!nowait) { @@ -1899,6 +1921,16 @@ public class AMQChannel final boolean noAck, final boolean exclusive, final boolean nowait, final FieldTable arguments) { + + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] BasicConsume[" +" queue: " + queue + + " consumerTag: " + consumerTag + + " noLocal: " + noLocal + + " noAck: " + noAck + + " exclusive: " + exclusive + " nowait: " + nowait + " arguments: " + arguments + " ]"); + } + AMQShortString consumerTag1 = consumerTag; VirtualHostImpl<?, ?, ?> vHost = _connection.getVirtualHost(); sync(); @@ -2020,6 +2052,11 @@ public class AMQChannel @Override public void receiveBasicGet(final AMQShortString queueName, final boolean noAck) { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] BasicGet[" +" queue: " + queueName + " noAck: " + noAck + " ]"); + } + VirtualHostImpl vHost = _connection.getVirtualHost(); sync(); AMQQueue queue = queueName == null ? getDefaultQueue() : vHost.getQueue(queueName.toString()); @@ -2080,6 +2117,14 @@ public class AMQChannel final boolean mandatory, final boolean immediate) { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] BasicPublish[" +" exchange: " + exchangeName + + " routingKey: " + routingKey + + " mandatory: " + mandatory + + " immediate: " + immediate + " ]"); + } + VirtualHostImpl vHost = _connection.getVirtualHost(); MessageDestination destination; @@ -2121,6 +2166,11 @@ public class AMQChannel @Override public void receiveBasicQos(final long prefetchSize, final int prefetchCount, final boolean global) { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] BasicQos[" +" prefetchSize: " + prefetchSize + " prefetchCount: " + prefetchCount + " global: " + global + " ]"); + } + sync(); setCredit(prefetchSize, prefetchCount); @@ -2133,6 +2183,11 @@ public class AMQChannel @Override public void receiveBasicRecover(final boolean requeue, final boolean sync) { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] BasicRecover[" + " requeue: " + requeue + " sync: " + sync + " ]"); + } + resend(); if (sync) @@ -2149,6 +2204,11 @@ public class AMQChannel @Override public void receiveBasicReject(final long deliveryTag, final boolean requeue) { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] BasicReject[" +" deliveryTag: " + deliveryTag + " requeue: " + requeue + " ]"); + } + MessageInstance message = getUnacknowledgedMessageMap().get(deliveryTag); if (message == null) @@ -2228,6 +2288,12 @@ public class AMQChannel final int classId, final int methodId) { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] ChannelClose[" +" replyCode: " + replyCode + " replyText: " + replyText + " classId: " + classId + " methodId: " + methodId + " ]"); + } + + sync(); _connection.closeChannel(this); @@ -2238,12 +2304,21 @@ public class AMQChannel @Override public void receiveChannelCloseOk() { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] ChannelCloseOk"); + } + _connection.closeChannelOk(getChannelId()); } @Override public void receiveMessageContent(final byte[] data) { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] MessageContent[" +" data: " + hex(data,_connection.getBinaryDataLimit()) + " ] "); + } if(hasCurrentMessage()) { @@ -2260,6 +2335,11 @@ public class AMQChannel @Override public void receiveMessageHeader(final BasicContentHeaderProperties properties, final long bodySize) { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] MessageHeader[ properties: {" + properties + "} bodySize: " + bodySize + " ]"); + } + if(hasCurrentMessage()) { publishContentHeader(new ContentHeaderBody(properties, bodySize)); @@ -2281,6 +2361,12 @@ public class AMQChannel @Override public void receiveChannelFlow(final boolean active) { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] ChannelFlow[" +" active: " + active + " ]"); + } + + sync(); setSuspended(!active); @@ -2293,6 +2379,11 @@ public class AMQChannel @Override public void receiveChannelFlowOk(final boolean active) { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] ChannelFlowOk[" +" active: " + active + " ]"); + } + // TODO - should we do anything here? } @@ -2301,6 +2392,12 @@ public class AMQChannel final AMQShortString routingKey, final AMQShortString queueName) { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] ExchangeBound[" +" exchange: " + exchangeName + " routingKey: " + + routingKey + " queue: " + queueName + " ]"); + } + VirtualHostImpl virtualHost = _connection.getVirtualHost(); MethodRegistry methodRegistry = _connection.getMethodRegistry(); @@ -2476,6 +2573,16 @@ public class AMQChannel final boolean nowait, final FieldTable arguments) { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] ExchangeDeclare[" +" exchange: " + exchangeName + + " type: " + type + + " passive: " + passive + + " durable: " + durable + + " autoDelete: " + autoDelete + + " internal: " + internal + " nowait: " + nowait + " arguments: " + arguments + " ]"); + } + ExchangeImpl exchange; VirtualHostImpl<?, ?, ?> virtualHost = _connection.getVirtualHost(); if (isDefaultExchange(exchangeName)) @@ -2620,6 +2727,12 @@ public class AMQChannel @Override public void receiveExchangeDelete(final AMQShortString exchangeStr, final boolean ifUnused, final boolean nowait) { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] ExchangeDelete[" +" exchange: " + exchangeStr + " ifUnused: " + ifUnused + " nowait: " + nowait + " ]"); + } + + VirtualHostImpl virtualHost = _connection.getVirtualHost(); sync(); try @@ -2673,6 +2786,14 @@ public class AMQChannel final boolean nowait, final FieldTable argumentsTable) { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] QueueBind[" +" queue: " + queueName + + " exchange: " + exchange + + " bindingKey: " + routingKey + + " nowait: " + nowait + " arguments: " + argumentsTable + " ]"); + } + VirtualHostImpl virtualHost = _connection.getVirtualHost(); AMQQueue<?> queue; if (queueName == null) @@ -2777,6 +2898,15 @@ public class AMQChannel final boolean nowait, final FieldTable arguments) { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] QueueDeclare[" +" queue: " + queueStr + + " passive: " + passive + + " durable: " + durable + + " exclusive: " + exclusive + + " autoDelete: " + autoDelete + " nowait: " + nowait + " arguments: " + arguments + " ]"); + } + VirtualHostImpl virtualHost = _connection.getVirtualHost(); final AMQShortString queueName; @@ -2966,6 +3096,11 @@ public class AMQChannel final boolean ifEmpty, final boolean nowait) { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] QueueDelete[" +" queue: " + queueName + " ifUnused: " + ifUnused + " ifEmpty: " + ifEmpty + " nowait: " + nowait + " ]"); + } + VirtualHostImpl virtualHost = _connection.getVirtualHost(); sync(); AMQQueue queue; @@ -3028,6 +3163,11 @@ public class AMQChannel @Override public void receiveQueuePurge(final AMQShortString queueName, final boolean nowait) { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] QueuePurge[" +" queue: " + queueName + " nowait: " + nowait + " ]"); + } + VirtualHostImpl virtualHost = _connection.getVirtualHost(); AMQQueue queue = null; if (queueName == null && (queue = getDefaultQueue()) == null) @@ -3073,6 +3213,14 @@ public class AMQChannel final AMQShortString routingKey, final FieldTable arguments) { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] QueueUnbind[" +" queue: " + queueName + + " exchange: " + exchange + + " bindingKey: " + routingKey + + " arguments: " + arguments + " ]"); + } + VirtualHostImpl virtualHost = _connection.getVirtualHost(); @@ -3132,6 +3280,11 @@ public class AMQChannel @Override public void receiveTxSelect() { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] TxSelect"); + } + setLocalTransactional(); MethodRegistry methodRegistry = _connection.getMethodRegistry(); @@ -3143,6 +3296,12 @@ public class AMQChannel @Override public void receiveTxCommit() { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] TxCommit"); + } + + if (!isTransactional()) { closeChannel(AMQConstant.COMMAND_INVALID, @@ -3165,6 +3324,11 @@ public class AMQChannel @Override public void receiveTxRollback() { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + _channelId + "] TxRollback"); + } + if (!isTransactional()) { closeChannel(AMQConstant.COMMAND_INVALID, Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1631344&r1=1631343&r2=1631344&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Mon Oct 13 10:50:37 2014 @@ -99,6 +99,8 @@ public class AMQProtocolEngine implement // channels. This value must be of the form 2^x - 1. private static final int CHANNEL_CACHE_SIZE = 0xff; private static final int REUSABLE_BYTE_BUFFER_CAPACITY = 65 * 1024; + public static final String BROKER_DEBUG_BINARY_DATA_LENGTH = "broker.debug.binaryDataLength"; + public static final int DEFAULT_DEBUG_BINARY_DATA_LENGTH = 80; private final Port<?> _port; private final long _creationTime; @@ -180,6 +182,7 @@ public class AMQProtocolEngine implement private int _messageCompressionThreshold; private int _currentClassId; private int _currentMethodId; + private int _binaryDataLimit; public AMQProtocolEngine(Broker broker, final NetworkConnection network, @@ -195,7 +198,9 @@ public class AMQProtocolEngine implement _decoder = new BrokerDecoder(this); _connectionID = connectionId; _logSubject = new ConnectionLogSubject(this); - + _binaryDataLimit = _broker.getContextKeys(false).contains(BROKER_DEBUG_BINARY_DATA_LENGTH) + ? _broker.getContextValue(Integer.class, BROKER_DEBUG_BINARY_DATA_LENGTH) + : DEFAULT_DEBUG_BINARY_DATA_LENGTH; _authorizedSubject.getPrincipals().add(new ConnectionPrincipal(this)); runAsSubject(new PrivilegedAction<Void>() { @@ -1365,6 +1370,11 @@ public class AMQProtocolEngine implement @Override public void receiveChannelOpen(final int channelId) { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV[" + channelId + "] ChannelOpen"); + } + // Protect the broker against out of order frame request. if (_virtualHost == null) { @@ -1405,6 +1415,11 @@ public class AMQProtocolEngine implement AMQShortString capabilities, boolean insist) { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV ConnectionOpen[" +" virtualHost: " + virtualHostName + " capabilities: " + capabilities + " insist: " + insist + " ]"); + } + String virtualHostStr; if ((virtualHostName != null) && virtualHostName.charAt(0) == '/') { @@ -1462,6 +1477,11 @@ public class AMQProtocolEngine implement final int classId, final int methodId) { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV ConnectionClose[" +" replyCode: " + replyCode + " replyText: " + replyText + " classId: " + classId + " methodId: " + methodId + " ]"); + } + if (_logger.isInfoEnabled()) { _logger.info("ConnectionClose received with reply code/reply text " + replyCode + "/" + @@ -1487,6 +1507,10 @@ public class AMQProtocolEngine implement @Override public void receiveConnectionCloseOk() { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV ConnectionCloseOk"); + } _logger.info("Received Connection-close-ok"); @@ -1503,6 +1527,10 @@ public class AMQProtocolEngine implement @Override public void receiveConnectionSecureOk(final byte[] response) { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV ConnectionSecureOk[ response: ******** ] "); + } Broker<?> broker = getBroker(); @@ -1579,6 +1607,19 @@ public class AMQProtocolEngine implement final byte[] response, final AMQShortString locale) { + if (_logger.isDebugEnabled()) + { + _logger.debug("RECV ConnectionStartOk[" + + " clientProperties: " + + clientProperties + + " mechanism: " + + mechanism + + " response: ********" + + " locale: " + + locale + + " ]"); + } + Broker<?> broker = getBroker(); _logger.info("SASL Mechanism selected: " + mechanism); @@ -1658,6 +1699,11 @@ public class AMQProtocolEngine implement @Override public void receiveConnectionTuneOk(final int channelMax, final long frameMax, final int heartbeat) { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV ConnectionTuneOk[" +" channelMax: " + channelMax + " frameMax: " + frameMax + " heartbeat: " + heartbeat + " ]"); + } + initHeartbeats(heartbeat); int brokerFrameMax = getBroker().getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE); @@ -1692,6 +1738,11 @@ public class AMQProtocolEngine implement } } + public int getBinaryDataLimit() + { + return _binaryDataLimit; + } + public final class WriteDeliverMethod implements ClientDeliveryMethod { @@ -1810,12 +1861,23 @@ public class AMQProtocolEngine implement @Override public void receiveHeartbeat() { + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV Heartbeat"); + } + // No op } @Override public void receiveProtocolHeader(final ProtocolInitiation protocolInitiation) { + + if(_logger.isDebugEnabled()) + { + _logger.debug("RECV ProtocolHeader [" + protocolInitiation + " ]"); + } + protocolInitiationReceived(protocolInitiation); } Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java?rev=1631344&r1=1631343&r2=1631344&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/BrokerDecoder.java Mon Oct 13 10:50:37 2014 @@ -26,6 +26,8 @@ import java.security.PrivilegedException import javax.security.auth.Subject; +import org.apache.log4j.Logger; + import org.apache.qpid.codec.MarkableDataInput; import org.apache.qpid.codec.ServerDecoder; import org.apache.qpid.framing.AMQFrameDecodingException; @@ -33,6 +35,7 @@ import org.apache.qpid.server.util.Serve public class BrokerDecoder extends ServerDecoder { + private static final Logger _logger = Logger.getLogger(BrokerDecoder.class); private final AMQProtocolEngine _connection; /** * Creates a new AMQP decoder. @@ -49,6 +52,11 @@ public class BrokerDecoder extends Serve protected void processFrame(final int channelId, final byte type, final long bodySize, final MarkableDataInput in) throws AMQFrameDecodingException, IOException { + long startTime = 0; + if (_logger.isDebugEnabled()) + { + startTime = System.currentTimeMillis(); + } Subject subject; AMQChannel channel = _connection.getChannel(channelId); if(channel == null) @@ -72,6 +80,11 @@ public class BrokerDecoder extends Serve return null; } }); + if(_logger.isDebugEnabled()) + { + _logger.debug("Frame handled in " + (System.currentTimeMillis() - startTime) + " ms."); + } + } catch (PrivilegedActionException e) { Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java?rev=1631344&r1=1631343&r2=1631344&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java Mon Oct 13 10:50:37 2014 @@ -20,14 +20,15 @@ */ package org.apache.qpid.framing; -import org.apache.qpid.AMQException; -import org.apache.qpid.codec.MarkableDataInput; - import java.io.DataOutput; import java.io.IOException; import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; import java.util.Arrays; +import org.apache.qpid.AMQException; +import org.apache.qpid.codec.MarkableDataInput; + public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQDataBlock { @@ -227,7 +228,7 @@ public class ProtocolInitiation extends public String toString() { - StringBuffer buffer = new StringBuffer(new String(_protocolHeader)); + StringBuffer buffer = new StringBuffer(new String(_protocolHeader, StandardCharsets.US_ASCII)); buffer.append(Integer.toHexString(_protocolClass)); buffer.append(Integer.toHexString(_protocolInstance)); buffer.append(Integer.toHexString(_protocolMajor)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org