Repository: camel
Updated Branches:
  refs/heads/master 5485d200f -> 2144590dd


CAMEL-8507 Added Support for the mandatory and immediate flags


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2144590d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2144590d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2144590d

Branch: refs/heads/master
Commit: 2144590dd83d4eea142faa2a47b5b34b0130a7d3
Parents: 5485d20
Author: Willem Jiang <willem.ji...@gmail.com>
Authored: Thu Mar 19 20:30:59 2015 +0800
Committer: Willem Jiang <willem.ji...@gmail.com>
Committed: Thu Mar 19 20:33:04 2015 +0800

----------------------------------------------------------------------
 .../component/rabbitmq/RabbitMQConstants.java   |  4 +++-
 .../component/rabbitmq/RabbitMQEndpoint.java    | 20 ++++++++++++++++++++
 .../component/rabbitmq/RabbitMQProducer.java    | 13 +++++++++----
 3 files changed, 32 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/2144590d/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java
index b1f4a0c..f2e5568 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java
@@ -17,7 +17,7 @@
 package org.apache.camel.component.rabbitmq;
 
 public final class RabbitMQConstants {
-    
+    // TODO need to change the constant which is start with camel
     public static final String ROUTING_KEY = "rabbitmq.ROUTING_KEY";
     public static final String EXCHANGE_NAME = "rabbitmq.EXCHANGE_NAME";
     public static final String CONTENT_TYPE = "rabbitmq.CONTENT_TYPE";
@@ -35,6 +35,8 @@ public final class RabbitMQConstants {
     public static final String TIMESTAMP = "rabbitmq.TIMESTAMP";
     public static final String APP_ID = "rabbitmq.APP_ID";
     public static final String REQUEUE = "rabbitmq.REQUEUE";
+    public static final String MANDATORY = "rabbitmq.MANDATORY";
+    public static final String IMMEDIATE = "rabbitmq.IMMEDIATE";
     public static final String RABBITMQ_DEAD_LETTER_EXCHANGE = 
"x-dead-letter-exchange";
     public static final String RABBITMQ_DEAD_LETTER_ROUTING_KEY = 
"x-dead-letter-routing-key";
     

http://git-wip-us.apache.org/repos/asf/camel/blob/2144590d/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index ea311d7..26ad0b5 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -139,6 +139,10 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
     //Maximum time (in milliseconds) waiting for channel
     @UriParam(defaultValue = "1000")
     private long channelPoolMaxWait = 1000;
+    @UriParam(defaultValue = "false")
+    private boolean mandatory;
+    @UriParam(defaultValue = "false")
+    private boolean immediate;
     @UriParam
     private ArgsConfigurer queueArgsConfigurer;
     @UriParam
@@ -633,6 +637,22 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
         this.channelPoolMaxWait = channelPoolMaxWait;
     }
 
+    public boolean isMandatory() {
+        return mandatory;
+    }
+
+    public void setMandatory(boolean mandatory) {
+        this.mandatory = mandatory;
+    }
+
+    public boolean isImmediate() {
+        return immediate;
+    }
+
+    public void setImmediate(boolean immediate) {
+        this.immediate = immediate;
+    }
+
     /**
      * Get the configurer for setting the queue args in Channel.queueDeclare
      * @return

http://git-wip-us.apache.org/repos/asf/camel/blob/2144590d/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
index 6652544..28858a6 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
@@ -139,8 +139,10 @@ public class RabbitMQProducer extends DefaultProducer {
         }
         byte[] messageBodyBytes = 
exchange.getIn().getMandatoryBody(byte[].class);
         AMQP.BasicProperties properties = buildProperties(exchange).build();
-
-        basicPublish(exchangeName, key, properties, messageBodyBytes);
+        Boolean mandatory = 
exchange.getIn().getHeader(RabbitMQConstants.MANDATORY, 
getEndpoint().isMandatory(), Boolean.class);
+        Boolean immediate = 
exchange.getIn().getHeader(RabbitMQConstants.IMMEDIATE, 
getEndpoint().isImmediate(), Boolean.class);
+        
+        basicPublish(exchangeName, key, mandatory, immediate, properties, 
messageBodyBytes);
     }
 
     /**
@@ -148,10 +150,13 @@ public class RabbitMQProducer extends DefaultProducer {
      *
      * @param exchange   Target exchange
      * @param routingKey Routing key
+     * @param mandatory  This flag tells the server how to react if the 
message cannot be routed to a queue.
+     * @param immediate  This flag tells the server how to react if the 
message cannot be routed to a queue consumer immediately.
      * @param properties Header properties
      * @param body       Body content
      */
-    private void basicPublish(final String exchange, final String routingKey, 
final AMQP.BasicProperties properties, final byte[] body) throws Exception {
+    private void basicPublish(final String exchange, final String routingKey, 
final boolean mandatory, final boolean immediate,  
+                              final AMQP.BasicProperties properties, final 
byte[] body) throws Exception {
         if (channelPool == null) {
             // Open connection and channel lazily
             openConnectionAndChannelPool();
@@ -159,7 +164,7 @@ public class RabbitMQProducer extends DefaultProducer {
         execute(new ChannelCallback<Void>() {
             @Override
             public Void doWithChannel(Channel channel) throws Exception {
-                channel.basicPublish(exchange, routingKey, properties, body);
+                channel.basicPublish(exchange, routingKey, mandatory, 
immediate, properties, body);
                 return null;
             }
         });

Reply via email to