Repository: camel
Updated Branches:
  refs/heads/master a59700876 -> fbca40a6d


CAMEL-10296: waitForConfirmsOrDie does not return if no timeout is set
for publisher acks and guaranteed delivery


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0ce64444
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0ce64444
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0ce64444

Branch: refs/heads/master
Commit: 0ce64444371a6ded33c2aeb50a2f563fed9ec855
Parents: a597008
Author: Florian Gessner <florian.gess...@tis.biz>
Authored: Wed Sep 7 21:04:10 2016 +0200
Committer: Andrea Cosentino <anco...@gmail.com>
Committed: Thu Sep 8 09:36:18 2016 +0200

----------------------------------------------------------------------
 .../component/rabbitmq/RabbitMQMessagePublisher.java     | 11 +++++++++--
 1 file changed, 9 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/0ce64444/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
index 15f69ff..248df21 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
@@ -38,10 +38,11 @@ import org.slf4j.LoggerFactory;
  * A method object for publishing to RabbitMQ
  */
 public class RabbitMQMessagePublisher {
-    private static final ReturnListener GUARANTEED_DELIVERY_RETURN_LISTENER = 
new ReturnListener() {
+    private final ReturnListener GUARANTEED_DELIVERY_RETURN_LISTENER = new 
ReturnListener() {
         @Override
         public void handleReturn(int replyCode, String replyText, String 
exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) 
throws IOException {
-            throw new RuntimeCamelException("Delivery failed for exchange " + 
exchange + " and routing key " + routingKey + "; replyCode = " + replyCode + " 
replyText = " + replyText);
+            LOG.warn("Delivery failed for exchange {} and routing key {}; 
replyCode = {}; replyText = {}", exchange, routingKey, replyCode, replyText);
+            basicReturnReceived = true;
         }
     };
 
@@ -51,6 +52,8 @@ public class RabbitMQMessagePublisher {
     private final String routingKey;
     private final RabbitMQEndpoint endpoint;
     private final Message message;
+    
+    private boolean basicReturnReceived = false;
 
     public RabbitMQMessagePublisher(final Exchange camelExchange, final 
Channel channel, final String routingKey, final RabbitMQEndpoint endpoint) {
         this.camelExchange = camelExchange;
@@ -110,6 +113,7 @@ public class RabbitMQMessagePublisher {
             channel.confirmSelect();
         }
         if (endpoint.isGuaranteedDeliveries()) {
+               basicReturnReceived = false;
             channel.addReturnListener(GUARANTEED_DELIVERY_RETURN_LISTENER);
 
         }
@@ -134,6 +138,9 @@ public class RabbitMQMessagePublisher {
         try {
             LOG.debug("Waiting for publisher acknowledgements for {}ms", 
endpoint.getPublisherAcknowledgementsTimeout());
             
channel.waitForConfirmsOrDie(endpoint.getPublisherAcknowledgementsTimeout());
+            if(basicReturnReceived){
+               throw new RuntimeCamelException("Failed to deliver message; 
basic.return received");
+            }
         } catch (InterruptedException | TimeoutException e) {
             LOG.warn("Acknowledgement error for {}", camelExchange);
             throw new RuntimeCamelException(e);

Reply via email to