This is an automated email from the ASF dual-hosted git repository. robbie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton-j.git
The following commit(s) were added to refs/heads/master by this push: new 0ca7831 PROTON-1998: add trace output for the AMQP and SASL headers being sent+received 0ca7831 is described below commit 0ca7831d222cce58f21fa368b2005a49eb3707a1 Author: Robbie Gemmell <rob...@apache.org> AuthorDate: Thu Feb 28 17:50:40 2019 +0000 PROTON-1998: add trace output for the AMQP and SASL headers being sent+received --- .../qpid/proton/engine/impl/FrameParser.java | 19 ++- .../qpid/proton/engine/impl/ProtocolTracer.java | 4 + .../qpid/proton/engine/impl/SaslFrameParser.java | 23 ++- .../apache/qpid/proton/engine/impl/SaslImpl.java | 19 ++- .../qpid/proton/engine/impl/TransportImpl.java | 25 ++- .../qpid/proton/engine/impl/FrameParserTest.java | 2 +- .../proton/engine/impl/SaslFrameParserTest.java | 10 +- .../qpid/proton/engine/impl/TransportImplTest.java | 187 ++++++++++++++++++++- 8 files changed, 278 insertions(+), 11 deletions(-) diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java index e3b3c55..ce4283e 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java @@ -40,6 +40,7 @@ import org.apache.qpid.proton.framing.TransportFrame; class FrameParser implements TransportInput { private static final Logger TRACE_LOGGER = Logger.getLogger("proton.trace"); + private static final String HEADER_DESCRIPTION = "AMQP"; private static final ByteBuffer _emptyInputBuffer = newWriteableBuffer(0); @@ -67,6 +68,7 @@ class FrameParser implements TransportInput private final ByteBufferDecoder _decoder; private final int _inputBufferSize; private final int _localMaxFrameSize; + private final TransportImpl _transport; private ByteBuffer _inputBuffer = null; private boolean _tail_closed = false; @@ -89,12 +91,13 @@ class FrameParser implements TransportInput * We store the last result when processing input so that * we know not to process any more input if it was an error. */ - FrameParser(FrameHandler frameHandler, ByteBufferDecoder decoder, int localMaxFrameSize) + FrameParser(FrameHandler frameHandler, ByteBufferDecoder decoder, int localMaxFrameSize, TransportImpl transport) { _frameHandler = frameHandler; _decoder = decoder; _localMaxFrameSize = localMaxFrameSize; _inputBufferSize = _localMaxFrameSize > 0 ? _localMaxFrameSize : 16*1024; + _transport = transport; } private void input(ByteBuffer in) throws TransportException @@ -238,6 +241,9 @@ class FrameParser implements TransportInput state = State.ERROR; break; } + + logHeader(); + state = State.SIZE_0; } else @@ -583,4 +589,15 @@ class FrameParser implements TransportInput { return _framesInput; } + + private void logHeader() { + if (_transport.isFrameTracingEnabled()) { + _transport.log(TransportImpl.INCOMING, HEADER_DESCRIPTION); + + ProtocolTracer tracer = _transport.getProtocolTracer(); + if (tracer != null) { + tracer.receivedHeader(HEADER_DESCRIPTION); + } + } + } } diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ProtocolTracer.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ProtocolTracer.java index ff1468c..0b92884 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ProtocolTracer.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ProtocolTracer.java @@ -30,6 +30,10 @@ public interface ProtocolTracer { public void receivedFrame(TransportFrame transportFrame); public void sentFrame(TransportFrame transportFrame); + default void receivedSaslBody(SaslFrameBody saslFrameBody) {} default void sentSaslBody(SaslFrameBody saslFrameBody) {} + + default void receivedHeader(String header) {} + default void sentHeader(String header) {} } diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslFrameParser.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslFrameParser.java index a6f75d5..141ec31 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslFrameParser.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslFrameParser.java @@ -33,6 +33,8 @@ import org.apache.qpid.proton.engine.TransportException; class SaslFrameParser { + private static final String HEADER_DESCRIPTION = "SASL"; + private SaslFrameHandler _sasl; enum State @@ -62,12 +64,14 @@ class SaslFrameParser private final ByteBufferDecoder _decoder; private int _frameSizeLimit; + private TransportImpl _transport; - SaslFrameParser(SaslFrameHandler sasl, ByteBufferDecoder decoder, int frameSizeLimit) + SaslFrameParser(SaslFrameHandler sasl, ByteBufferDecoder decoder, int frameSizeLimit, TransportImpl transport) { _sasl = sasl; _decoder = decoder; _frameSizeLimit = frameSizeLimit; + _transport = transport; } /** @@ -206,6 +210,9 @@ class SaslFrameParser state = State.ERROR; break; } + + logHeader(); + state = State.SIZE_0; } else @@ -413,4 +420,18 @@ class SaslFrameParser _size = 0; _state = State.SIZE_0; } + + private void logHeader() + { + if (_transport.isFrameTracingEnabled()) + { + _transport.log(TransportImpl.INCOMING, HEADER_DESCRIPTION); + + ProtocolTracer tracer = _transport.getProtocolTracer(); + if (tracer != null) + { + tracer.receivedHeader(HEADER_DESCRIPTION); + } + } + } } diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java index 7c17e20..eea4053 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java @@ -51,6 +51,7 @@ public class SaslImpl implements Sasl, SaslFrameBody.SaslFrameBodyHandler<Void>, private static final Logger _logger = Logger.getLogger(SaslImpl.class.getName()); public static final byte SASL_FRAME_TYPE = (byte) 1; + private static final String HEADER_DESCRIPTION = "SASL"; private final DecoderImpl _decoder = new DecoderImpl(); private final EncoderImpl _encoder = new EncoderImpl(_decoder); @@ -98,7 +99,7 @@ public class SaslImpl implements Sasl, SaslFrameBody.SaslFrameBodyHandler<Void>, _maxFrameSize = maxFrameSize; AMQPDefinedTypes.registerAllTypes(_decoder,_encoder); - _frameParser = new SaslFrameParser(this, _decoder, maxFrameSize); + _frameParser = new SaslFrameParser(this, _decoder, maxFrameSize, _transport); _frameWriter = new FrameWriter(_encoder, maxFrameSize, FrameWriter.SASL_FRAME_TYPE, _transport); } @@ -212,6 +213,8 @@ public class SaslImpl implements Sasl, SaslFrameBody.SaslFrameBodyHandler<Void>, { if(!_headerWritten) { + logHeader(); + _frameWriter.writeHeader(AmqpHeader.SASL_HEADER); _headerWritten = true; return AmqpHeader.SASL_HEADER.length; @@ -222,6 +225,20 @@ public class SaslImpl implements Sasl, SaslFrameBody.SaslFrameBodyHandler<Void>, } } + private void logHeader() + { + if (_transport.isFrameTracingEnabled()) + { + _transport.log(TransportImpl.OUTGOING, HEADER_DESCRIPTION); + + ProtocolTracer tracer = _transport.getProtocolTracer(); + if (tracer != null) + { + tracer.sentHeader(HEADER_DESCRIPTION); + } + } + } + @Override public int pending() { diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java index 30e5a2a..d8bac2d 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java @@ -82,6 +82,7 @@ public class TransportImpl extends EndpointImpl private static final boolean FRM_ENABLED = getBooleanEnv("PN_TRACE_FRM"); private static final int TRACE_FRAME_PAYLOAD_LENGTH = Integer.getInteger("proton.trace_frame_payload_length", 1024); + private static final String HEADER_DESCRIPTION = "AMQP"; // trace levels private int _levels = (FRM_ENABLED ? TRACE_FRM : 0); @@ -179,7 +180,7 @@ public class TransportImpl extends EndpointImpl if(!_init) { _init = true; - _frameParser = new FrameParser(_frameHandler , _decoder, _maxFrameSize); + _frameParser = new FrameParser(_frameHandler , _decoder, _maxFrameSize, this); _inputProcessor = _frameParser; _outputProcessor = new TransportOutputAdaptor(this, _maxFrameSize, isUseReadOnlyOutputBuffer()); } @@ -865,11 +866,27 @@ public class TransportImpl extends EndpointImpl { if(!_headerWritten) { + outputHeaderDescription(); + _frameWriter.writeHeader(AmqpHeader.HEADER); _headerWritten = true; } } + private void outputHeaderDescription() + { + if (isFrameTracingEnabled()) + { + log(TransportImpl.OUTGOING, HEADER_DESCRIPTION); + + ProtocolTracer tracer = getProtocolTracer(); + if (tracer != null) + { + tracer.sentHeader(HEADER_DESCRIPTION); + } + } + } + private void processOpen() { if (!_isOpenSent && (_conditionSet || @@ -1726,6 +1743,12 @@ public class TransportImpl extends EndpointImpl } } + void log(final String event, final String headerDescription) { + if (isTraceFramesEnabled()) { + outputMessage(event, 0, headerDescription, null); + } + } + private void outputMessage(String event, int channel, Object frameBody, Binary payload) { StringBuilder msg = new StringBuilder(); diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java index 148c096..3b0e36b 100644 --- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java +++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java @@ -52,7 +52,7 @@ public class FrameParserTest private FrameHandler _mockFrameHandler = mock(FrameHandler.class); private DecoderImpl _decoder = new DecoderImpl(); private EncoderImpl _encoder = new EncoderImpl(_decoder); - private final FrameParser _frameParser = new FrameParser(_mockFrameHandler, _decoder, DEFAULT_MAX_FRAME_SIZE); + private final FrameParser _frameParser = new FrameParser(_mockFrameHandler, _decoder, DEFAULT_MAX_FRAME_SIZE, new TransportImpl()); private final AmqpFramer _amqpFramer = new AmqpFramer(); diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/SaslFrameParserTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/SaslFrameParserTest.java index e766572..8e8a4dc 100644 --- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/SaslFrameParserTest.java +++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/SaslFrameParserTest.java @@ -47,8 +47,9 @@ public class SaslFrameParserTest { private final SaslFrameHandler _mockSaslFrameHandler = mock(SaslFrameHandler.class); private final ByteBufferDecoder _mockDecoder = mock(ByteBufferDecoder.class); + private final TransportImpl mockTransport = mock(TransportImpl.class); private final SaslFrameParser _frameParser; - private final SaslFrameParser _frameParserWithMockDecoder = new SaslFrameParser(_mockSaslFrameHandler, _mockDecoder, Transport.MIN_MAX_FRAME_SIZE); + private final SaslFrameParser _frameParserWithMockDecoder = new SaslFrameParser(_mockSaslFrameHandler, _mockDecoder, Transport.MIN_MAX_FRAME_SIZE, mockTransport); private final AmqpFramer _amqpFramer = new AmqpFramer(); private final SaslInit _saslFrameBody; @@ -60,7 +61,7 @@ public class SaslFrameParserTest EncoderImpl encoder = new EncoderImpl(decoder); AMQPDefinedTypes.registerAllTypes(decoder,encoder); - _frameParser = new SaslFrameParser(_mockSaslFrameHandler, decoder, Transport.MIN_MAX_FRAME_SIZE); + _frameParser = new SaslFrameParser(_mockSaslFrameHandler, decoder, Transport.MIN_MAX_FRAME_SIZE, mockTransport); _saslFrameBody = new SaslInit(); _saslFrameBody.setMechanism(Symbol.getSymbol("unused")); _saslFrameBytes = ByteBuffer.wrap(_amqpFramer.generateSaslFrame(0, new byte[0], _saslFrameBody)); @@ -106,7 +107,7 @@ public class SaslFrameParserTest @Test public void testInputOfFrameWithInvalidSizeWhenSpecifyingLargeMaxFrameSize() { - SaslFrameParser frameParserWithLargeMaxSize = new SaslFrameParser(_mockSaslFrameHandler, _mockDecoder, 2017); + SaslFrameParser frameParserWithLargeMaxSize = new SaslFrameParser(_mockSaslFrameHandler, _mockDecoder, 2017, mockTransport); sendAmqpSaslHeader(frameParserWithLargeMaxSize); // http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-security-v1.0-os.html#doc-idp43536 @@ -248,8 +249,9 @@ public class SaslFrameParserTest private void doInputOfInvalidHeaderTestImpl(int invalidIndex) { SaslFrameHandler mockSaslFrameHandler = mock(SaslFrameHandler.class); ByteBufferDecoder mockDecoder = mock(ByteBufferDecoder.class); + TransportImpl mockTransport = mock(TransportImpl.class); - SaslFrameParser saslFrameParser = new SaslFrameParser(mockSaslFrameHandler, mockDecoder, Transport.MIN_MAX_FRAME_SIZE); + SaslFrameParser saslFrameParser = new SaslFrameParser(mockSaslFrameHandler, mockDecoder, Transport.MIN_MAX_FRAME_SIZE, mockTransport); byte[] header = Arrays.copyOf(AmqpHeader.SASL_HEADER, AmqpHeader.SASL_HEADER.length); header[invalidIndex] = 'X'; diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java index 3f0da6c..8b81355 100644 --- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java +++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java @@ -39,6 +39,7 @@ import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.Random; +import java.util.concurrent.atomic.AtomicReference; import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.Binary; @@ -3643,7 +3644,7 @@ public class TransportImplTest } @Test - public void testProtocolTracingLogsToTracer() + public void testProtocolTracingLogsFrameToTracer() { Connection connection = new ConnectionImpl(); List<TransportFrame> frames = new ArrayList<>(); @@ -3673,7 +3674,8 @@ public class TransportImplTest } @Test - public void testProtocolTracingLogsToSystem() { + public void testProtocolTracingLogsFrameToSystem() + { Connection connection = new ConnectionImpl(); TransportImpl spy = spy(_transport); @@ -3691,4 +3693,185 @@ public class TransportImplTest assertTrue(frameCatcher.getValue().getBody() instanceof Open); assertNull(frameCatcher.getValue().getPayload()); } + + @Test + public void testProtocolTracingLogsHeaderToTracer() + { + doProtocolTracingLogsHeaderToTracerTestImpl(false); + } + + @Test + public void testProtocolTracingLogsHeaderSaslToTracer() + { + doProtocolTracingLogsHeaderToTracerTestImpl(true); + } + + private void doProtocolTracingLogsHeaderToTracerTestImpl(boolean sasl) + { + Connection connection = new ConnectionImpl(); + AtomicReference<String> headerRef = new AtomicReference<>(); + _transport.setProtocolTracer(new ProtocolTracer() + { + @Override + public void receivedHeader(String header) + { + assertTrue(headerRef.compareAndSet(null, header)); + } + + @Override + public void receivedFrame(TransportFrame transportFrame) { } + @Override + public void sentFrame(TransportFrame transportFrame) { } + + }); + + if (sasl) + { + _transport.sasl(); + } + + assertTrue(_transport.isHandlingFrames()); + _transport.bind(connection); + + assertTrue(_transport.isHandlingFrames()); + _transport.getInputBuffer().put(sasl ? AmqpHeader.SASL_HEADER : AmqpHeader.HEADER); + _transport.process(); + assertTrue(_transport.isHandlingFrames()); + + assertNotNull(headerRef.get()); + assertEquals(sasl ? "SASL" : "AMQP", headerRef.get()); + } + + @Test + public void testProtocolTracingLogsHeaderToSystem() + { + doProtocolTracingLogsHeaderToSystemTestImpl(false); + } + + @Test + public void testProtocolTracingLogsHeaderSaslToSystem() + { + doProtocolTracingLogsHeaderToSystemTestImpl(true); + } + + private void doProtocolTracingLogsHeaderToSystemTestImpl(boolean sasl) + { + Connection connection = new ConnectionImpl(); + + AtomicReference<String> headerRef = new AtomicReference<>(); + AtomicReference<String> eventRef = new AtomicReference<>(); + TransportImpl transport = new TransportImpl() + { + @Override + public void log(String event, String header) { + assertTrue(eventRef.compareAndSet(null, event)); + assertTrue(headerRef.compareAndSet(null, header)); + } + }; + transport.trace(2); + + if (sasl) + { + transport.sasl(); + } + + transport.bind(connection); + + transport.getInputBuffer().put(sasl ? AmqpHeader.SASL_HEADER : AmqpHeader.HEADER); + transport.process(); + + assertEquals(TransportImpl.INCOMING, eventRef.get()); + assertEquals(sasl ? "SASL" : "AMQP", headerRef.get()); + } + + @Test + public void testProtocolTracingLogsOutboundHeaderToTracer() + { + doProtocolTracingLogsOutboundHeaderToTracerTestImpl(false); + } + + @Test + public void testProtocolTracingLogsOutboundHeaderSaslToTracer() + { + doProtocolTracingLogsOutboundHeaderToTracerTestImpl(true); + } + + private void doProtocolTracingLogsOutboundHeaderToTracerTestImpl(boolean sasl) + { + Connection connection = new ConnectionImpl(); + AtomicReference<String> headerRef = new AtomicReference<>(); + _transport.setProtocolTracer(new ProtocolTracer() + { + @Override + public void sentHeader(String header) + { + assertTrue(headerRef.compareAndSet(null, header)); + } + + @Override + public void receivedFrame(TransportFrame transportFrame) { } + @Override + public void sentFrame(TransportFrame transportFrame) { } + + }); + + if (sasl) + { + _transport.sasl(); + } + + _transport.bind(connection); + + ByteBuffer expected = ByteBuffer.wrap(sasl ? AmqpHeader.SASL_HEADER : AmqpHeader.HEADER); + + _transport.pending(); + assertEquals(expected, _transport.getOutputBuffer()); + + assertEquals(sasl ? "SASL" : "AMQP", headerRef.get()); + } + + @Test + public void testProtocolTracingLogsOutboundHeaderToSystem() + { + doProtocolTracingLogsOutboundHeaderToSystemTestImpl(false); + } + + @Test + public void testProtocolTracingLogsOutboundHeaderSaslToSystem() + { + doProtocolTracingLogsOutboundHeaderToSystemTestImpl(true); + } + + private void doProtocolTracingLogsOutboundHeaderToSystemTestImpl(boolean sasl) + { + Connection connection = new ConnectionImpl(); + + AtomicReference<String> headerRef = new AtomicReference<>(); + AtomicReference<String> eventRef = new AtomicReference<>(); + TransportImpl transport = new TransportImpl() + { + @Override + public void log(String event, String header) + { + assertTrue(eventRef.compareAndSet(null, event)); + assertTrue(headerRef.compareAndSet(null, header)); + } + }; + transport.trace(2); + + if (sasl) + { + transport.sasl(); + } + + transport.bind(connection); + + ByteBuffer expected = ByteBuffer.wrap(sasl ? AmqpHeader.SASL_HEADER : AmqpHeader.HEADER); + + transport.pending(); + assertEquals(expected, transport.getOutputBuffer()); + + assertEquals(TransportImpl.OUTGOING, eventRef.get()); + assertEquals(sasl ? "SASL" : "AMQP", headerRef.get()); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org