This is an automated email from the ASF dual-hosted git repository. robbie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-jms.git
The following commit(s) were added to refs/heads/master by this push: new c4ab27d QPIDJMS-481: improve handling of transport/decode errors and better signal failure cause c4ab27d is described below commit c4ab27d2b92c8d9156e138bd7486a9088165ddcb Author: Robbie Gemmell <rob...@apache.org> AuthorDate: Mon Nov 25 12:09:24 2019 +0000 QPIDJMS-481: improve handling of transport/decode errors and better signal failure cause --- .../qpid/jms/provider/amqp/AmqpProvider.java | 35 ++++ .../integration/TransactionsIntegrationTest.java | 54 ++++++ .../jms/test/testpeer/matchers/ErrorMatcher.java | 192 +++++++++++++++++++++ 3 files changed, 281 insertions(+) diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java index 718019b..8880e3a 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java @@ -77,6 +77,8 @@ import org.apache.qpid.jms.transports.Transport; import org.apache.qpid.jms.transports.TransportListener; import org.apache.qpid.jms.util.PropertyUtil; import org.apache.qpid.jms.util.QpidJMSThreadFactory; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.engine.Collector; import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.Delivery; @@ -153,6 +155,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP org.apache.qpid.proton.engine.Transport.Factory.create(); private final Collector protonCollector = new CollectorImpl(); private final Connection protonConnection = Connection.Factory.create(); + private boolean protonTransportErrorHandled; private final ProviderFutureFactory futureFactory; private AsyncResult connectionRequest; @@ -850,6 +853,11 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP TRACE_BYTES.info("Received: {}", ByteBufUtil.hexDump(input)); } + if(protonTransportErrorHandled) { + LOG.trace("Skipping data processing, proton transport previously errored."); + return; + } + do { ByteBuffer buffer = protonTransport.tail(); int chunkSize = Math.min(buffer.remaining(), input.readableBytes()); @@ -1001,6 +1009,17 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP amqpEventSink.processDeliveryUpdates(this, (Delivery) protonEvent.getContext()); } break; + case TRANSPORT_ERROR: + // We handle authentication failure elsewhere, but in doing so we close the transport + // head which would also get us here, so only action this if auth succeeded. + if(authenticator == null || (authenticator.isComplete() && authenticator.wasSuccessful())) { + protonTransportErrorHandled = true; + ErrorCondition transportCondition = protonTransport.getCondition(); + String message = extractTransportErrorMessage(transportCondition); + + throw new ProviderFailedException(message); + } + break; default: break; } @@ -1016,6 +1035,22 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP } } + private static String extractTransportErrorMessage(ErrorCondition errorCondition) { + String message = "Error without description from proton Transport"; + if (errorCondition != null) { + if (errorCondition.getDescription() != null && !errorCondition.getDescription().isEmpty()) { + message = "Error in proton Transport: " + errorCondition.getDescription(); + } + + Symbol condition = errorCondition.getCondition(); + if (condition != null) { + message = message + " [condition = " + condition + "]"; + } + } + + return message; + } + protected boolean pumpToProtonTransport() { return pumpToProtonTransport(NOOP_REQUEST, true); } diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java index 39d8f00..42dbada 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TransactionsIntegrationTest.java @@ -34,6 +34,7 @@ import java.util.ArrayDeque; import java.util.Deque; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import javax.jms.Connection; import javax.jms.JMSException; @@ -48,6 +49,7 @@ import javax.jms.TransactionRolledBackException; import org.apache.qpid.jms.JmsConnection; import org.apache.qpid.jms.JmsDefaultConnectionListener; import org.apache.qpid.jms.JmsOperationTimedOutException; +import org.apache.qpid.jms.exceptions.JmsConnectionFailedException; import org.apache.qpid.jms.message.JmsInboundMessageDispatch; import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy; import org.apache.qpid.jms.test.QpidJmsTestCase; @@ -77,6 +79,7 @@ import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.DescribedType; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.UnsignedInteger; +import org.hamcrest.MatcherAssert; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; @@ -1683,4 +1686,55 @@ public class TransactionsIntegrationTest extends QpidJmsTestCase { testPeer.waitForAllHandlersToComplete(3000); } } + + @Test(timeout=20000) + public void testTransactionDeclaredDispositionWithoutTxnId() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer); + connection.start(); + + AtomicReference<JMSException> failure = new AtomicReference<>(); + CountDownLatch exceptionListenerFired = new CountDownLatch(1); + connection.setExceptionListener(jmse -> { + failure.compareAndSet(null, jmse); + exceptionListenerFired.countDown(); + }); + + testPeer.expectBegin(); + testPeer.expectCoordinatorAttach(); + + // Expect declare, reply declared but without a txn-id, which is illegal. + testPeer.expectDeclare(null); + + // TODO: swap this in for below after PROTON-2142 fix is available: + // ErrorMatcher errorMatcher = new ErrorMatcher() + // .withCondition(equalTo(AmqpError.DECODE_ERROR)) + // .withDescription(equalTo("The txn-id field cannot be omitted")); + // testPeer.expectClose(errorMatcher, false); + testPeer.expectClose(false); + testPeer.setSuppressReadExceptionOnClose(true); + + try { + connection.createSession(true, Session.SESSION_TRANSACTED); + fail("expected exception to be thrown"); + } catch (JMSException e) { + // Expected + } + + assertTrue("The ExceptionListener should have been alerted", exceptionListenerFired.await(3, TimeUnit.SECONDS)); + JMSException ex = failure.get(); + assertTrue("Unexpected exception type: " + ex, ex instanceof JmsConnectionFailedException); + + // TODO: swap this in for below after PROTON-2142 fix is available: + // MatcherAssert.assertThat("Unexpected exception type: ", ex.getMessage(), + // equalTo("The JMS connection has failed: Error in proton Transport: The txn-id field cannot be omitted [condition = amqp:decode-error]")); + MatcherAssert.assertThat("Unexpected exception type: ", ex.getMessage(), + equalTo("The JMS connection has failed: Error in proton Transport: org.apache.qpid.proton.engine.TransportException: " + + "org.apache.qpid.proton.codec.DecodeException: The txn-id field cannot be omitted [condition = amqp:connection:framing-error]")); + + testPeer.waitForAllHandlersToComplete(1000); + + connection.close(); // Already nuked under the covers due to txn-id being missing + } + } } diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/ErrorMatcher.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/ErrorMatcher.java new file mode 100644 index 0000000..4f90c72 --- /dev/null +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/ErrorMatcher.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.jms.test.testpeer.matchers; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; + +import java.util.List; + +import org.apache.qpid.proton.amqp.DescribedType; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.UnsignedLong; +import org.apache.qpid.jms.test.testpeer.AbstractFieldAndDescriptorMatcher; +import org.hamcrest.Matcher; +import org.hamcrest.Description; +import org.hamcrest.TypeSafeMatcher; + +public class ErrorMatcher extends TypeSafeMatcher<Object> +{ + private ErrorMatcherCore coreMatcher = new ErrorMatcherCore(); + private String mismatchTextAddition; + private Object described; + private Object descriptor; + + public ErrorMatcher() + { + } + + @Override + protected boolean matchesSafely(Object received) + { + try + { + assertThat(received, instanceOf(DescribedType.class)); + descriptor = ((DescribedType)received).getDescriptor(); + if(!coreMatcher.descriptorMatches(descriptor)) + { + mismatchTextAddition = "Descriptor mismatch"; + return false; + } + + described = ((DescribedType)received).getDescribed(); + assertThat(described, instanceOf(List.class)); + @SuppressWarnings("unchecked") + List<Object> fields = (List<Object>) described; + + coreMatcher.verifyFields(fields); + } + catch (AssertionError ae) + { + mismatchTextAddition = "AssertionFailure: " + ae.getMessage(); + return false; + } + + return true; + } + + @Override + protected void describeMismatchSafely(Object item, Description mismatchDescription) + { + mismatchDescription.appendText("\nActual form: ").appendValue(item); + + mismatchDescription.appendText("\nExpected descriptor: ") + .appendValue(coreMatcher.getSymbolicDescriptor()) + .appendText(" / ") + .appendValue(coreMatcher.getNumericDescriptor()); + + if(mismatchTextAddition != null) + { + mismatchDescription.appendText("\nAdditional info: ").appendValue(mismatchTextAddition); + } + } + + public void describeTo(Description description) + { + description + .appendText("Modified which matches: ") + .appendValue(coreMatcher.getMatchers()); + } + + + public ErrorMatcher withCondition(Matcher<?> m) + { + coreMatcher.withCondition(m); + return this; + } + + public ErrorMatcher withDescription(Matcher<?> m) + { + coreMatcher.withDescription(m); + return this; + } + + public ErrorMatcher withInfo(Matcher<?> m) + { + coreMatcher.withInfo(m); + return this; + } + + public Object getReceivedCondition() + { + return coreMatcher.getReceivedCondition(); + } + + public Object getReceivedDescription() + { + return coreMatcher.getReceivedDescription(); + } + + public Object getReceivedInfo() + { + return coreMatcher.getReceivedInfo(); + } + + + + //Inner core matching class + public static class ErrorMatcherCore extends AbstractFieldAndDescriptorMatcher + { + /** Note that the ordinals of the Field enums match the order specified in the AMQP spec */ + public enum Field + { + CONDITION, + DESCRIPTION, + INFO + } + + public ErrorMatcherCore() + { + super(UnsignedLong.valueOf(0x000000000000001DL), + Symbol.valueOf("amqp:error:list")); + } + + + public ErrorMatcherCore withCondition(Matcher<?> m) + { + getMatchers().put(Field.CONDITION, m); + return this; + } + + public ErrorMatcherCore withDescription(Matcher<?> m) + { + getMatchers().put(Field.DESCRIPTION, m); + return this; + } + + public ErrorMatcherCore withInfo(Matcher<?> m) + { + getMatchers().put(Field.INFO, m); + return this; + } + + public Object getReceivedCondition() + { + return getReceivedFields().get(Field.CONDITION); + } + + public Object getReceivedDescription() + { + return getReceivedFields().get(Field.DESCRIPTION); + } + + public Object getReceivedInfo() + { + return getReceivedFields().get(Field.INFO); + } + + @Override + protected Enum<?> getField(int fieldIndex) + { + return Field.values()[fieldIndex]; + } + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org