Repository: camel Updated Branches: refs/heads/master a59700876 -> fbca40a6d
CAMEL-10296: waitForConfirmsOrDie does not return if no timeout is set for publisher acks and guaranteed delivery Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0ce64444 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0ce64444 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0ce64444 Branch: refs/heads/master Commit: 0ce64444371a6ded33c2aeb50a2f563fed9ec855 Parents: a597008 Author: Florian Gessner <florian.gess...@tis.biz> Authored: Wed Sep 7 21:04:10 2016 +0200 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Thu Sep 8 09:36:18 2016 +0200 ---------------------------------------------------------------------- .../component/rabbitmq/RabbitMQMessagePublisher.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/0ce64444/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java index 15f69ff..248df21 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java @@ -38,10 +38,11 @@ import org.slf4j.LoggerFactory; * A method object for publishing to RabbitMQ */ public class RabbitMQMessagePublisher { - private static final ReturnListener GUARANTEED_DELIVERY_RETURN_LISTENER = new ReturnListener() { + private final ReturnListener GUARANTEED_DELIVERY_RETURN_LISTENER = new ReturnListener() { @Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { - throw new RuntimeCamelException("Delivery failed for exchange " + exchange + " and routing key " + routingKey + "; replyCode = " + replyCode + " replyText = " + replyText); + LOG.warn("Delivery failed for exchange {} and routing key {}; replyCode = {}; replyText = {}", exchange, routingKey, replyCode, replyText); + basicReturnReceived = true; } }; @@ -51,6 +52,8 @@ public class RabbitMQMessagePublisher { private final String routingKey; private final RabbitMQEndpoint endpoint; private final Message message; + + private boolean basicReturnReceived = false; public RabbitMQMessagePublisher(final Exchange camelExchange, final Channel channel, final String routingKey, final RabbitMQEndpoint endpoint) { this.camelExchange = camelExchange; @@ -110,6 +113,7 @@ public class RabbitMQMessagePublisher { channel.confirmSelect(); } if (endpoint.isGuaranteedDeliveries()) { + basicReturnReceived = false; channel.addReturnListener(GUARANTEED_DELIVERY_RETURN_LISTENER); } @@ -134,6 +138,9 @@ public class RabbitMQMessagePublisher { try { LOG.debug("Waiting for publisher acknowledgements for {}ms", endpoint.getPublisherAcknowledgementsTimeout()); channel.waitForConfirmsOrDie(endpoint.getPublisherAcknowledgementsTimeout()); + if(basicReturnReceived){ + throw new RuntimeCamelException("Failed to deliver message; basic.return received"); + } } catch (InterruptedException | TimeoutException e) { LOG.warn("Acknowledgement error for {}", camelExchange); throw new RuntimeCamelException(e);